diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b2eefd4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,18 @@ +*.log +.*/ +*/** + +**/.idea/ +**/__pycache__/ +**/.pytest_cache/ +**/venv/ +**/.env +**/*.lock +**/mirror_cn.py + +*.egg-info/ +*.log +output/ +out/ +./test*.py +*.egg-info diff --git a/README.md b/README.md index a13693d..e7df608 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,23 @@ -# docker-drag +# docker-pull This repository contains Python scripts for interacting with Docker Hub or other registries, without needing the Docker client itself. It relies on the Docker registry [HTTPS API v2](https://docs.docker.com/registry/spec/api/). +## Use aria2c as downloader + +You need to install `aria2c` manually. + +```sh +# use pixi as package manager +pixi global install aria2c +# install docker-pull +uv pip install docker_pull[aria2]@git+https://github.com/aclon314/docker-pull.git +# example +docker-pull busybox +``` + +> Because some huge layers (>5GB) could easily failed to download, and docker/podman DON'T support resume download(though they support cached layers if the layers are 100% downloaded). + ## Pull a Docker image in HTTPS `python docker_pull.py hello-world` diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..86f7fd6 --- /dev/null +++ b/__init__.py @@ -0,0 +1,17 @@ +""" +Usage: +```python +puller = DockerPull( + args.image, + output_path=args.output_path, + registry_username=args.username, + registry_password=args.password, +) +puller.save_image(skip=args.skip, keep_tmp=args.verbose) +``` +""" + +from .docker_pull import ( + DockerImage, + DockerPull, +) diff --git a/docker_pull.py b/docker_pull.py old mode 100644 new mode 100755 index 8484f3f..7403f8e --- a/docker_pull.py +++ b/docker_pull.py @@ -1,204 +1,836 @@ -import os -import sys -import gzip -from io import BytesIO -import json -import hashlib -import shutil +#!/bin/env python +# PYTHON_ARGCOMPLETE_OK +""" +# contarner-pull + +This repository contains Python scripts for interacting with Docker Hub or other registries, without needing the Docker client itself. A fork of https://github.com/NotGlop/docker-drag as a module and better CLI interaction. + +It relies on the Docker registry [HTTPS API v2](https://docs.docker.com/registry/spec/api/). + +Recommand to install `argcomplete aria2p[gui]` for shell tab completion and faster & **resume download**. + +## Updates + +Fixes from https://github.com/heran226813/docker-drag (centralised request session and retry logic) and https://github.com/lenrys29/docker-drag (authentication for private image registry and Nexus OSS) are included. Also componentise functions and image URL separation for better readability, as well as allowing other scripts to import this file as a module. + +## CLI interaction + +To use this script, you can run it from the command line with the following arguments: +`python3 contarner_pull.py --username USERNAME --password PASSWORD [registry/][repository/]image[:tag|@digest]" out.tar` + +See the full help with `python3 contarner_pull.py --help`. +You may also set the environment variables `REGISTRY_USERNAME` and `REGISTRY_PASSWORD` to avoid typing them in the command line. + +## Module import + +To use the function, import `DockerPuller` and call `save_image()`: +```py +from contarner_pull import DockerPuller +puller = DockerPuller(image_url, output_path="out.tar", registry_username="", registry_password="") +puller.save_image() +``` + +## License + +Released under GNU General Public License v3.0 as `docker-drag`. +""" +LAYER_TAR_GZ = "layer.tar.gz" # !!! rename to 'layer_gzip.tar' for docker-drag !!! +from pathlib import Path +import re, os, json, time, hashlib, shutil, tarfile, urllib3, logging, argparse import requests -import tarfile -import urllib3 -urllib3.disable_warnings() +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry +from typing import Iterable, Literal, Self, get_args -if len(sys.argv) != 2 : - print('Usage:\n\tdocker_pull.py [registry/][repository/]image[:tag|@digest]\n') - exit(1) +urllib3.disable_warnings() -# Look for the Docker image to download -repo = 'library' -tag = 'latest' -imgparts = sys.argv[1].split('/') +# Configure logging +logger = logging.getLogger(__name__) +# from pysnooper import snoop try: - img,tag = imgparts[-1].split('@') -except ValueError: - try: - img,tag = imgparts[-1].split(':') - except ValueError: - img = imgparts[-1] -# Docker client doesn't seem to consider the first element as a potential registry unless there is a '.' or ':' -if len(imgparts) > 1 and ('.' in imgparts[0] or ':' in imgparts[0]): - registry = imgparts[0] - repo = '/'.join(imgparts[1:-1]) -else: - registry = 'registry-1.docker.io' - if len(imgparts[:-1]) != 0: - repo = '/'.join(imgparts[:-1]) - else: - repo = 'library' -repository = '{}/{}'.format(repo, img) - -# Get Docker authentication endpoint when it is required -auth_url='https://auth.docker.io/token' -reg_service='registry.docker.io' -resp = requests.get('https://{}/v2/'.format(registry), verify=False) -if resp.status_code == 401: - auth_url = resp.headers['WWW-Authenticate'].split('"')[1] - try: - reg_service = resp.headers['WWW-Authenticate'].split('"')[3] - except IndexError: - reg_service = "" - -# Get Docker token (this function is useless for unauthenticated registries like Microsoft) -def get_auth_head(type): - resp = requests.get('{}?service={}&scope=repository:{}:pull'.format(auth_url, reg_service, repository), verify=False) - access_token = resp.json()['token'] - auth_head = {'Authorization':'Bearer '+ access_token, 'Accept': type} - return auth_head - -# Docker style progress bar -def progress_bar(ublob, nb_traits): - sys.stdout.write('\r' + ublob[7:19] + ': Downloading [') - for i in range(0, nb_traits): - if i == nb_traits - 1: - sys.stdout.write('>') - else: - sys.stdout.write('=') - for i in range(0, 49 - nb_traits): - sys.stdout.write(' ') - sys.stdout.write(']') - sys.stdout.flush() - -# Fetch manifest v2 and get image layer digests -auth_head = get_auth_head('application/vnd.docker.distribution.manifest.v2+json') -resp = requests.get('https://{}/v2/{}/manifests/{}'.format(registry, repository, tag), headers=auth_head, verify=False) -if (resp.status_code != 200): - print('[-] Cannot fetch manifest for {} [HTTP {}]'.format(repository, resp.status_code)) - print(resp.content) - auth_head = get_auth_head('application/vnd.docker.distribution.manifest.list.v2+json') - resp = requests.get('https://{}/v2/{}/manifests/{}'.format(registry, repository, tag), headers=auth_head, verify=False) - if (resp.status_code == 200): - print('[+] Manifests found for this tag (use the @digest format to pull the corresponding image):') - manifests = resp.json()['manifests'] - for manifest in manifests: - for key, value in manifest["platform"].items(): - sys.stdout.write('{}: {}, '.format(key, value)) - print('digest: {}'.format(manifest["digest"])) - exit(1) -layers = resp.json()['layers'] - -# Create tmp folder that will hold the image -imgdir = 'tmp_{}_{}'.format(img, tag.replace(':', '@')) -os.mkdir(imgdir) -print('Creating image structure in: ' + imgdir) - -config = resp.json()['config']['digest'] -confresp = requests.get('https://{}/v2/{}/blobs/{}'.format(registry, repository, config), headers=auth_head, verify=False) -file = open('{}/{}.json'.format(imgdir, config[7:]), 'wb') -file.write(confresp.content) -file.close() - -content = [{ - 'Config': config[7:] + '.json', - 'RepoTags': [ ], - 'Layers': [ ] - }] -if len(imgparts[:-1]) != 0: - content[0]['RepoTags'].append('/'.join(imgparts[:-1]) + '/' + img + ':' + tag) -else: - content[0]['RepoTags'].append(img + ':' + tag) - -empty_json = '{"created":"1970-01-01T00:00:00Z","container_config":{"Hostname":"","Domainname":"","User":"","AttachStdin":false, \ - "AttachStdout":false,"AttachStderr":false,"Tty":false,"OpenStdin":false, "StdinOnce":false,"Env":null,"Cmd":null,"Image":"", \ - "Volumes":null,"WorkingDir":"","Entrypoint":null,"OnBuild":null,"Labels":null}}' - -# Build layer folders -parentid='' -for layer in layers: - ublob = layer['digest'] - # FIXME: Creating fake layer ID. Don't know how Docker generates it - fake_layerid = hashlib.sha256((parentid+'\n'+ublob+'\n').encode('utf-8')).hexdigest() - layerdir = imgdir + '/' + fake_layerid - os.mkdir(layerdir) - - # Creating VERSION file - file = open(layerdir + '/VERSION', 'w') - file.write('1.0') - file.close() - - # Creating layer.tar file - sys.stdout.write(ublob[7:19] + ': Downloading...') - sys.stdout.flush() - auth_head = get_auth_head('application/vnd.docker.distribution.manifest.v2+json') # refreshing token to avoid its expiration - bresp = requests.get('https://{}/v2/{}/blobs/{}'.format(registry, repository, ublob), headers=auth_head, stream=True, verify=False) - if (bresp.status_code != 200): # When the layer is located at a custom URL - bresp = requests.get(layer['urls'][0], headers=auth_head, stream=True, verify=False) - if (bresp.status_code != 200): - print('\rERROR: Cannot download layer {} [HTTP {}]'.format(ublob[7:19], bresp.status_code, bresp.headers['Content-Length'])) - print(bresp.content) - exit(1) - # Stream download and follow the progress - bresp.raise_for_status() - unit = int(bresp.headers['Content-Length']) / 50 - acc = 0 - nb_traits = 0 - progress_bar(ublob, nb_traits) - with open(layerdir + '/layer_gzip.tar', "wb") as file: - for chunk in bresp.iter_content(chunk_size=8192): - if chunk: - file.write(chunk) - acc = acc + 8192 - if acc > unit: - nb_traits = nb_traits + 1 - progress_bar(ublob, nb_traits) - acc = 0 - sys.stdout.write("\r{}: Extracting...{}".format(ublob[7:19], " "*50)) # Ugly but works everywhere - sys.stdout.flush() - with open(layerdir + '/layer.tar', "wb") as file: # Decompress gzip response - unzLayer = gzip.open(layerdir + '/layer_gzip.tar','rb') - shutil.copyfileobj(unzLayer, file) - unzLayer.close() - os.remove(layerdir + '/layer_gzip.tar') - print("\r{}: Pull complete [{}]".format(ublob[7:19], bresp.headers['Content-Length'])) - content[0]['Layers'].append(fake_layerid + '/layer.tar') - - # Creating json file - file = open(layerdir + '/json', 'w') - # last layer = config manifest - history - rootfs - if layers[-1]['digest'] == layer['digest']: - # FIXME: json.loads() automatically converts to unicode, thus decoding values whereas Docker doesn't - json_obj = json.loads(confresp.content) - del json_obj['history'] - try: - del json_obj['rootfs'] - except: # Because Microsoft loves case insensitiveness - del json_obj['rootfS'] - else: # other layers json are empty - json_obj = json.loads(empty_json) - json_obj['id'] = fake_layerid - if parentid: - json_obj['parent'] = parentid - parentid = json_obj['id'] - file.write(json.dumps(json_obj)) - file.close() - -file = open(imgdir + '/manifest.json', 'w') -file.write(json.dumps(content)) -file.close() - -if len(imgparts[:-1]) != 0: - content = { '/'.join(imgparts[:-1]) + '/' + img : { tag : fake_layerid } } -else: # when pulling only an img (without repo and registry) - content = { img : { tag : fake_layerid } } -file = open(imgdir + '/repositories', 'w') -file.write(json.dumps(content)) -file.close() - -# Create image tar and clean tmp folder -docker_tar = repo.replace('/', '_') + '_' + img + '.tar' -sys.stdout.write("Creating archive...") -sys.stdout.flush() -tar = tarfile.open(docker_tar, "w") -tar.add(imgdir, arcname=os.path.sep) -tar.close() -shutil.rmtree(imgdir) -print('\rDocker image pulled: ' + docker_tar) + from aria2p_wrapper import Aria, File, Log + + aria = Aria() +except ImportError: + logger.warning("pip install aria2p_wrapper for aria2c support, fallback!") + +TYPE_REGISTRY = Literal[ + "registry.k8s.io", + "registry.gitlab.com", + "ghcr.io", + "quay.io", + "docker.io", +] +REGISTRY = [reg + "/" for reg in get_args(TYPE_REGISTRY)] + + +def create_session(retry=3) -> requests.Session: + session = requests.Session() + retry_strategy = Retry( + total=retry, + backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504], + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + session.mount("http://", adapter) + session.mount("https://", adapter) + + # Check if proxy environment variables are set + http_proxy = os.environ.get("HTTP_PROXY") or os.environ.get("http_proxy") + https_proxy = os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy") + + if http_proxy or https_proxy: + session.proxies = {"http": http_proxy, "https": https_proxy} # type: ignore + logger.info("[+] Using proxy settings from environment") + + return session + + +############################################ HELPERS ###################################################### + + +class DockerImage: + original_url: str + """Original URL of the Docker image""" + auth_url: str + """Authentication URL for Docker Image Registry; should get from image registry""" + registry_host_url: str = "registry-1.docker.io" + """Registry URL for Docker Image Registry""" + repo_name: str + """Image name without registry and tag""" + tag: str = "latest" + """Image tag""" + + accept_manifest = [ + "application/vnd.docker.distribution.manifest.v2+json", + "application/vnd.docker.distribution.manifest.list.v2+json", + "application/vnd.oci.image.manifest.v1+json", + "application/vnd.oci.image.index.v1+json", + ] + + @property + def Accept(self): + return ",".join(self.accept_manifest) + + def __init__( + self, + original_url: str, + auth_url: str, + registry_host_url: str, + repo_name: str, + tag: str, + ): + self.original_url: str = original_url + self.auth_url: str = auth_url + self.registry_host_url: str = registry_host_url + self.repo_name: str = repo_name + self.tag: str = tag + + def get_manifest_url(self, manifest_name: str = "") -> str: + name = self.tag if not manifest_name else manifest_name + return f"https://{self.registry_host_url}/v2/{self.repo_name}/manifests/{name}" + + def get_blobs_url(self, digest: str) -> str: + """blob digest = sha256:""" + return f"https://{self.registry_host_url}/v2/{self.repo_name}/blobs/{digest}" + + @property + def repository_reference(self) -> str: + """Repository reference (registry and repository) without tag""" + if self.registry_host_url == self.__class__.registry_host_url: + return f"{self.repo_name}" + else: + return f"{self.registry_host_url}/{self.repo_name}" + + @property + def full_name(self) -> str: + """Fully qualified image name (registry, repository and tag)""" + return f"{self.repository_reference}:{self.tag}" + + @property + def tar_filename(self) -> str: + return f"{self.repo_name.replace('/', '-')}-{self.tag}.tar" + + def get_auth_headers( + self, + session: requests.Session, + username: str = "", + password: str = "", + ): + """Get Docker request authentication token for header. + This function is useless for unauthenticated registries like Microsoft. + + For registries that allows authenticate with PAT (e.g. Forgejo), you may provide the token explicitly as keyword argument `password="TOKEN"` without a `username`. + """ + try: + # setup authentication with username and password + auth = None + if username and password != "": + auth = (username, password) + + resp = session.get(self.auth_url, verify=False, timeout=30, auth=auth) + if resp.status_code != 200: + # authentication failed or user error + raise requests.exceptions.RequestException( + f"{resp.status_code}: {resp.content.decode()}", + request=resp.request, + response=resp, + ) + js: dict = resp.json() + auth_head = { + **( + {"Authorization": "Bearer " + js["token"]} + if "token" in js.keys() + else {} + ), + "Accept": self.Accept, + } + return auth_head + except requests.exceptions.RequestException as e: + logger.error(f"[-] Authentication error: {e}") + raise + + @staticmethod + def get_endpoint_registry( + session: requests.Session, registry_host_url: str, repository: str + ): + """Get endpoint registry from url""" + # default auth url is same as registry url + server_auth_url = "https://" + registry_host_url + "/v2/" + try: + logger.info(f"[+] Connecting to registry: {registry_host_url}") + resp = session.get( + f"https://{registry_host_url}/v2/", verify=False, timeout=30 + ) + if resp.status_code == 401: + try: + realm_address = re.search( + 'realm="([^"]*)"', resp.headers["WWW-Authenticate"] + ) + assert realm_address is not None # type check + + # If Repository is on NEXUS OSS + if realm_address.group(1) == "Sonatype Nexus Repository Manager": + server_auth_url = "https://" + registry_host_url + "/v2/" + logger.debug("[ ] Detected: Nexus OSS repository type") + + # If Repository is on DockerHub like + elif realm_address.group( + 1 + ) != registry_host_url and "http" in realm_address.group(1): + service = re.search( + 'service="([^"]*)"', resp.headers["WWW-Authenticate"] + ) + assert service is not None # type check + server_auth_url = f"{realm_address.group(1)}?service={service.group(1)}&scope=repository:{repository}:pull" + logger.debug("[ ] Detected: Docker Hub repository type") + + except IndexError: + logger.info( + "[-] Failed to fetch authentication endpoint info from registry, using registry URL" + ) + + return server_auth_url + except requests.exceptions.RequestException as e: + logger.error("[-] Connection error:", str(e)) + logger.error("[*] Troubleshooting tips:") + logger.error(" 1. Check your internet connection") + logger.error( + " 2. If you are behind a proxy, set HTTP_PROXY and HTTPS_PROXY environment variables" + ) + logger.error(" 3. Try using a VPN if the registry is blocked") + logger.error( + f" 4. Verify if the registry {registry_host_url} is accessible from your network" + ) + raise + + @classmethod + def parse_image_name(cls, image_name: str, session: requests.Session) -> Self: + img = None + tag = cls.tag + registry_host_url = cls.registry_host_url + + # Look for the Docker image to download + imgparts = image_name.split("/") + try: + img, tag = imgparts[-1].split("@") + except ValueError: + try: + img, tag = imgparts[-1].split(":") + except ValueError: + img = imgparts[-1] + + # Docker client doesn't seem to consider the first element as a potential registry unless there is a '.' or ':' + if len(imgparts) > 1 and ("." in imgparts[0] or ":" in imgparts[0]): + registry_host_url = imgparts[0] + repo_without_last = "/".join(imgparts[1:-1]) + else: + if len(imgparts[:-1]) != 0: + repo_without_last = "/".join(imgparts[:-1]) + else: + repo_without_last = "library" + repository = f"{repo_without_last}/{img}" + auth_url = cls.get_endpoint_registry(session, registry_host_url, repository) + logger.info(f"Auth endpoint :\t{auth_url}") + + return cls( + original_url=image_name, + auth_url=auth_url, + registry_host_url=registry_host_url, + repo_name=repository, + tag=tag, + ) + + +def Print(*args): + print(*args, end="", flush=True) + + +def progress_bar(sha, nb_traits): + """Docker style [>>==]""" + Print(f"\r{sha[7:19]}: Downloading [") + for i in range(0, nb_traits): + if i == nb_traits - 1: + Print(">") + else: + Print("=") + Print(" " * (49 - nb_traits) + "]") + + +############################################## MAIN ######################################################## + + +class DockerPull: + """ + Orchestrates downloading a docker image and saving it as a tar archive. + + Args: + image_url: docker image name in the format of [registry/][repository/]image[:tag|@digest] + output_path: file path for final tar (if ends not with .tar it will be appended) + registry_username: registry username (empty string by default) + registry_password: registry password or PAT (empty string by default) + session: requests.Session instance. Default uses create_session() result. + """ + + _aria_valid: bool = "aria" in globals() + + def __init__( + self, + image_url: str, + output_path: str = "", + registry_username: str = "", + registry_password: str = "", + session: requests.Session = create_session(), + only: Iterable[str] = set(), + skip: Iterable[int] = set(), + retry=10, + aria2=True, + verify=False, + keep_tmp=False, + verbose=False, + ): + self.output_path = output_path + self.registry_username = registry_username + self.registry_password = registry_password + self.session = session + self.image = DockerImage.parse_image_name(image_url, self.session) + self.tmp_dir = str(self.image.tar_filename).replace(".tar", "") + self.only = only + self.skip = skip + self.retry = retry + self._aria_valid = aria2 and self._aria_valid + self.verify = verify + self.keep_tmp = keep_tmp + self.verbose = verbose + # placeholder for values filled during flow + self.manifest_json = {} + self.confresp_content = b"" + self.fake_layerID = "" + self.unzip_queue: list[Path] = [] + + # @snoop("pull.log", depth=3) + def save_image(self) -> None: + """use temp dir `IMAGE_NAME-TAG/` to save tars, then create final tar from there. + + Args: + skip: list of layer indexes to skip + keep_tmp: whether to keep temp dir `IMAGE_NAME-TAG/` after creating tar + retry: could set to <=0 to disable retry. Continuous retry counts when fail repeatdly. + """ + if not os.path.exists(self.tmp_dir): + logger.debug("[+] Creating temporary directory: %s", self.tmp_dir) + os.makedirs(self.tmp_dir) + + tried = 0 + retry = self.retry + while retry > 0: + try: + self._fetch_manifest() + self._download_config() + self._download_layers(skip=self.skip) + break # Success + except Exception as e: + tried += 1 + if tried >= retry: + raise RuntimeError("[-] All retries failed") from e + timer = int(2 + 1.5 ** (tried - 1)) + logger.error(f"[-] Retry after {timer} seconds from error: {e}") + time.sleep(timer) # Exponential backoff + self._create_tar(keep_tmp=self.keep_tmp) + + logger.info( + f"[+] Docker image for {self.image.full_name} is saved to {self.output_path or self.image.tar_filename}" + ) + if not self.keep_tmp and os.path.exists(self.tmp_dir): + shutil.rmtree(self.tmp_dir) + + def _fetch_manifest(self): + logger.info(f"[+] Trying to fetch manifest for {self.image.full_name}") + try: + resp = self.session.get( + self.image.get_manifest_url(), + headers=self.image.get_auth_headers( + self.session, self.registry_username, self.registry_password + ), + verify=self.verify, + timeout=30, + ) + except requests.exceptions.RequestException as e: + logger.error("[-] Manifest fetch error: %s", str(e)) + raise + + if resp.status_code != 200: + logger.error( + f"[-] Cannot fetch manifest for {self.image.registry_host_url} [HTTP {resp.status_code}]" + ) + logger.error(resp.content) + raise + + try: + resp_json = resp.json() + logger.debug("[+] Response JSON structure:") + logger.debug(json.dumps(resp_json, indent=2)) + + # Handle manifest list (multi-arch images) kept compact + if "manifests" in resp_json: + selected_manifest = self._select_manifest(resp_json) + try: + manifest_resp = self.session.get( + self.image.get_manifest_url(selected_manifest["digest"]), + headers=self.image.get_auth_headers( + self.session, self.registry_username, self.registry_password + ), + verify=self.verify, + timeout=30, + ) + if manifest_resp.status_code != 200: + logger.error( + "[-] Failed to fetch specific manifest: %s", + manifest_resp.status_code, + ) + logger.error("[-] Response content: %s", manifest_resp.content) + raise + resp_json = manifest_resp.json() + logger.info("[+] Successfully fetched specific manifest") + except Exception as e: + logger.error("[-] Error fetching specific manifest: %s", e) + raise + + if "layers" not in resp_json: + logger.error("[-] Error: No layers found in manifest") + logger.error("[-] Available keys: %s", list(resp_json.keys())) + raise + + self.manifest_json = resp_json + + except KeyError as e: + logger.error("[-] Error: Could not find required key in response: %s", e) + logger.error("[-] Available keys: %s", list(resp_json.keys())) + raise + except Exception as e: + logger.error("[-] Unexpected error: %s", e) + raise + + def _select_manifest(self, resp_json: dict) -> dict: + logger.debug("[+] This is a multi-arch image. Scanning manifests") + # choose linux/amd64 first, fallback windows/amd64, else first + selected = None + for m in resp_json["manifests"]: + platform = m.get("platform", {}) + if ( + platform.get("os") == "linux" + and platform.get("architecture") == "amd64" + ): + selected = m + break + if not selected: + for m in resp_json["manifests"]: + platform = m.get("platform", {}) + if ( + platform.get("os") == "windows" + and platform.get("architecture") == "amd64" + ): + selected = m + break + if not selected: + selected = resp_json["manifests"][0] + logger.info( + "[+] Selected platform: %s/%s", + selected.get("platform", {}).get("os", "unknown"), + selected.get("platform", {}).get("architecture", "unknown"), + ) + return selected + + def _download_config(self): + config_digest = self.manifest_json["config"]["digest"] + try: + confresp = self.session.get( + self.image.get_blobs_url(config_digest), + headers=self.image.get_auth_headers( + self.session, self.registry_username, self.registry_password + ), + verify=self.verify, + timeout=30, + ) + except requests.exceptions.RequestException as e: + logger.error("[-] Config fetch error: %s", str(e)) + raise + self.confresp_content = confresp.content + with open(f"{self.tmp_dir}/{config_digest[7:]}.json", "wb") as file: + file.write(self.confresp_content) + + # create base manifest file content list + self._manifest_content = [ + { + "Config": config_digest[7:] + ".json", + "RepoTags": [self.image.full_name], + "Layers": [], + } + ] + + def _compute_fake_layerID(self, parentid: str, sha: str) -> str: + """Compute fake layer id from parentid and digest.""" + return hashlib.sha256( + (parentid + "\n" + sha + "\n").encode("utf-8") + ).hexdigest() + + def _prepare_layer_dir(self, fake_layerID: str): + """Create directory for a layer and write VERSION file.""" + layerdir = Path(self.tmp_dir) / fake_layerID + layerdir.mkdir(parents=True, exist_ok=True) + with open(layerdir / "VERSION", "w") as f: + f.write("1.0") + return layerdir + + def _fetch_layer_response(self, layer: dict, sha: str) -> requests.Response: + """Fetch layer content (primary blob URL then fallback to layer['urls'][0]).""" + try: + bresp = self.session.get( + self.image.get_blobs_url(sha), + headers=self.image.get_auth_headers( + self.session, self.registry_username, self.registry_password + ), + stream=True, + verify=self.verify, + timeout=30, + ) + except requests.exceptions.RequestException as e: + logger.error("[-] Layer fetch error: %s", str(e)) + raise + + if bresp.status_code != 200: + # fallback to layer["urls"][0] if provided + fallback_url = None + if isinstance(layer.get("urls"), list) and layer["urls"]: + fallback_url = layer["urls"][0] + if fallback_url: + try: + bresp = self.session.get( + fallback_url, + headers=self.image.get_auth_headers( + self.session, self.registry_username, self.registry_password + ), + stream=True, + verify=self.verify, + timeout=30, + ) + except requests.exceptions.RequestException as e: + logger.error("[-] Layer fetch error: %s", str(e)) + raise + if bresp.status_code != 200: + logger.error( + "[-] ERROR: Cannot download layer %s [HTTP %s]", + sha[7:19], + bresp.status_code, + ) + logger.error(bresp.content) + raise + return bresp + + def _stream_save_gzip(self, bresp: requests.Response, gzip: Path, sha: str) -> None: + """Stream response content into gzip_path while updating the progress bar.""" + if self._aria_valid: + try: + file = File(bresp.url, path=gzip, sha256=sha[7:]) + aria.download(file, response=bresp) + return + except Exception as e: + # if logger.isEnabledFor(logging.DEBUG): + # raise e + logger.error(f"[-] aria failed download layer {sha[7:19]}: {e}") + + # 普通下载方式 + content_length = int(bresp.headers.get("Content-Length", "0")) + unit = content_length / 50 if content_length else 1 + acc = 0 + nb_traits = 0 + progress_bar(sha, nb_traits) + with open(gzip, "wb") as file: + for chunk in bresp.iter_content(chunk_size=8192): + if chunk: + file.write(chunk) + acc += len(chunk) + if acc > unit: + nb_traits += 1 + progress_bar(sha, nb_traits) + acc = 0 + + def _decompress_layer( + self, gzip: Path, sha: str = "sha256:", buffer=1024**3, remove=True + ) -> None: + """gzip to tar, limit buffer default to 1GB""" + sha = sha[7:19] + if not sha: + sha = gzip.parent.name[7:19] + logger.info(f"{sha}: Extracting `layer.tar.gz` to `layer.tar`") + + try: + with open(os.path.join(gzip.parent, "layer.tar"), "wb") as tar_f: + with gzip.open("rb") as gz_f: + # 使用有限缓冲区进行解压,防止内存占用过大 + while chunk := gz_f.read(buffer): + tar_f.write(chunk) + + logger.info(f"{sha}: Extraction OK") + try: + if remove and os.path.exists(gzip): + os.remove(gzip) + except Exception as e: + logger.debug(f"[-] Failed to remove temp gzip {gzip}: {e}") + except Exception as e: + logger.error(f"[-] Extraction failed on {sha}: {e}") + raise + + def _build_and_write_layer_json( + self, + is_last: bool, + fake_layerID: str, + parentid: str, + empty_json: str, + layerdir: str, + ) -> str: + """ + Build the layer json structure and write to file. + Returns the new parentid (which equals fake_layerID). + """ + if is_last: + json_obj = json.loads(self.confresp_content) + # attempt to remove keys similar to original behavior + if "history" in json_obj: + del json_obj["history"] + try: + del json_obj["rootfs"] + except Exception: + try: + del json_obj["rootfS"] + except Exception: + pass + else: + json_obj = json.loads(empty_json) + + json_obj["id"] = fake_layerID + if parentid: + json_obj["parent"] = parentid + + with open(os.path.join(layerdir, "json"), "w") as jf: + jf.write(json.dumps(json_obj)) + + return json_obj["id"] + + def _download_layers( + self, + only: Iterable[str] = set(), + skip: Iterable[int] = set(), + keep_tmp=False, + ): + """High-level loop over manifest layers delegating to helper methods.""" + layers: list[dict[str, str]] = self.manifest_json["layers"] + empty_json = '{"created":"1970-01-01T00:00:00Z","container_config":{"Hostname":"","Domainname":"","User":"","AttachStdin":false, \ + "AttachStdout":false,"AttachStderr":false,"Tty":false,"OpenStdin":false, "StdinOnce":false,"Env":null,"Cmd":null,"Image":"", \ + "Volumes":null,"WorkingDir":"","Entrypoint":null,"OnBuild":null,"Labels":null}}' + + parentid = "" + len_layers = len(layers) + + for i, layer in enumerate(layers): + sha = layer["digest"] + fake_layerID = self._compute_fake_layerID(parentid, sha) + if i + 1 in skip: + logger.debug( + f"{i+1}/{len_layers}\t{layer['digest'][7:19]}: Skip {fake_layerID[:12]}" + ) + continue + if only and not any([fake_layerID.startswith(ol) for ol in only]): + logger.debug( + f"{i+1}/{len_layers}\t{layer['digest'][7:19]}: Skip {fake_layerID[:12]}" + ) + continue + layerDir = self._prepare_layer_dir(fake_layerID) + tar_gzip = layerDir / LAYER_TAR_GZ + + # Download layer (stream) + logger.info(f"{i+1}/{len_layers}\t{sha[7:19]}: Downloading...") + bresp = self._fetch_layer_response(layer, sha) + self._stream_save_gzip(bresp, tar_gzip, sha) + self.unzip_queue.append(tar_gzip) + + if not (self._aria_valid): + logger.info( + f"{sha[7:19]}: Downloaded [{bresp.headers.get('Content-Length')}]" + ) + + self._manifest_content[0]["Layers"].append(fake_layerID + "/layer.tar") + + # Create layer json and update parentid + is_last = layers[-1]["digest"] == layer["digest"] + parentid = self._build_and_write_layer_json( + is_last, fake_layerID, parentid, empty_json, str(layerDir) + ) + + # record last fake id for repositories mapping + self.fake_layerID = fake_layerID + # download_tasks.append((gzip_path, sha)) + + # 等待所有aria任务完成 + if self._aria_valid: + aria.wait_all() + + def _create_tar(self, keep_tmp=False): + """after tar created, remove all layer.tar.gz files""" + logger.info("[=] Decompressing layers...") + gzip_lack = [g for g in self.unzip_queue if not g.exists()] + for gzip in self.unzip_queue: + if gzip.exists(): + self._decompress_layer(gzip, remove=not keep_tmp) + if gzip_lack: + raise FileNotFoundError(f"{gzip_lack}") + + # manifest.json + with open(os.path.join(self.tmp_dir, "manifest.json"), "w") as mf: + mf.write(json.dumps(self._manifest_content)) + + content = {self.image.repository_reference: {self.image.tag: self.fake_layerID}} + with open(os.path.join(self.tmp_dir, "repositories"), "w") as rf: + rf.write(json.dumps(content)) + + logger.info("[=] Creating archive...") + out_path = self.output_path or self.image.tar_filename + if not str(out_path).endswith(".tar"): + out_path = str(out_path) + ".tar" + with tarfile.open(out_path, "w") as tar: + tar.add(self.tmp_dir, arcname=os.path.sep) + self.output_path = out_path + + +def parse_arg(): + parser = argparse.ArgumentParser( + description="Pull a Docker image and save it as a tar archive." + ) + action = parser.add_argument( + "image", + help="docker image name in the format of [registry/][repository/]image[:tag|@digest]", + ) + action.completer = lambda prefix, **kwargs: REGISTRY # type: ignore + parser.add_argument( + "output_path", nargs="?", help="output path for the image tar", default="" + ) + parser.add_argument( + "--username", + "-u", + help="container registry username", + default=os.getenv("REGISTRY_USERNAME", ""), + ) + parser.add_argument( + "--password", + "-p", + help="container registry password. Pass PAT to this parameter without --username if you are using a Personal Access Token (PAT)", + default=os.getenv("REGISTRY_PASSWORD", ""), + ) + parser.add_argument( + "--skip", + "-s", + type=str, + help="skip which layers, eg: 1,2 (Don't download) / eg2: !1,2 (Download only) / eg3: !0abc (hash folder prefix with at least 4 characters)", + ) + parser.add_argument( + "--aria-not", + action="store_true", + help="do NOT use aria2c to download layers", + ) + parser.add_argument( + "--verbose", + action="store_true", + help="keep temp files, set logging level to DEBUG", + ) + parser.add_argument( + "--verify", + action="store_true", + help="verify SSL certificate", + ) + try: + import argcomplete + + argcomplete.autocomplete(parser) + except ImportError: + ... + args = parser.parse_args() + return args + + +def script_entry(): + args = parse_arg() + LOGLVL = logging.DEBUG if args.verbose else logging.INFO + logger.setLevel(LOGLVL) + if "Log" in globals(): + Log.setLevel(LOGLVL) + skip = only = set() + if args.skip: + txt = str(args.skip).strip() + if is_not := txt.startswith("!"): + txt = txt[1:] + txt = txt.split(",") + skip = {int(x) for x in txt if len(x) <= 3} + only = {x for x in txt if len(x) >= 4} + if is_not: + skip = set(range(1, 1000)) - skip # TODO: Edge case + try: + puller = DockerPull( + args.image, + output_path=args.output_path, + registry_username=args.username, + registry_password=args.password, + only=only, + skip=skip, + verify=args.verify, + aria2=not args.aria_not, + keep_tmp=args.verbose, + verbose=args.verbose, + ) + puller.save_image() + except KeyboardInterrupt: + logger.warning("[-] Interrupted by user") + exit(1) + + +if __name__ == "__main__": + script_entry() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..05b2c22 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,53 @@ +[build-system] +build-backend = "setuptools.build_meta" +requires = [ "setuptools>=61.0", "wheel" ] + + +[project] +dependencies = [ ] +description = "download docker image as .tar file for `docker load -i img.tar` thanks to aria2p" +keywords = [ "oci", "docker", "podman", "container", "aria2c", "aria2p" ] +name = "docker_pull" +readme = "README.md" +version = "0.0.1" +classifiers = [ +"License :: OSI Approved :: MIT License", +"Programming Language :: Python", +"Development Status :: 3 - Alpha", +"Intended Audience :: Developers", +"Programming Language :: Python :: 3", +"Programming Language :: Python :: 3.11", +"Programming Language :: Python :: 3.12", +"Programming Language :: Python :: 3.13" +] +requires-python = ">=3.11" + + +[[project.authors]] +name = "NotGlop, slkoo-cc, heran226813, lenrys29, nolca" +email = "nolca@qq.com" + + +[project.license] +file = "LICENSE" + + +[project.optional-dependencies] +dev = [ "pytest", "pytest-asyncio" ] +aria2 = [ +"aria2p", +"aria2p_wrapper@git+https://github.com/AClon314/aria2p_wrapper.git" +] + + +[project.urls] +Download = "http://github.com/AClon314/aria2p_wrapper/releases" +Homepage = "http://github.com/AClon314/aria2p_wrapper" + + +[project.scripts] +docker-pull = "docker_pull.docker_pull:script_entry" + + +[tool.setuptools.package-dir] +docker_pull = "."