Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ jobs:

steps:
- uses: actions/checkout@v4
- name: Obtain dependency projects
run: git clone https://github.com/desultory/zenlib
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
Expand All @@ -30,8 +28,21 @@ jobs:
run: |
python -m venv venv
venv/bin/pip install --upgrade pip
venv/bin/pip install ./zenlib
venv/bin/pip install .

- name: Obtain dependency projects
run: |
mkdir ../deps/zenlib -p
git clone https://github.com/desultory/zenlib ../deps/zenlib
venv/bin/pip install ../deps/zenlib

- name: Run mypy type checks
run: |
venv/bin/pip install mypy
venv/bin/mypy ../pycpio

- name: Install pycpio
run: venv/bin/pip install .

- name: Run unit tests
run: |
cd tests
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "pycpio"
version = "1.6.0"
version = "1.6.1"

authors = [
{ name="Desultory", email="dev@pyl.onl" },
Expand All @@ -18,6 +18,9 @@ classifiers = [
]
dependencies = ["zenlib >= 3.0.2"]

[options.package_data]
pycpio = "py.typed"

[project.optional-dependencies]
zstd = ["zstandard"]

Expand Down
6 changes: 3 additions & 3 deletions src/pycpio/cpio/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@
from pycpio.cpio.file import CPIO_File
from pycpio.cpio.symlink import CPIO_Symlink
from pycpio.header import HEADER_NEW
from zenlib.logging import loggify
from zenlib.logging import LoggerMixIn
from zenlib.util import colorize as c_
from zenlib.util import handle_plural


@loggify
class CPIOArchive(dict):
class CPIOArchive(dict, LoggerMixIn):
def __setitem__(self, name, value):
if name in self:
raise AttributeError(f"Entry already exists: {c_(name, 'red')}")
Expand Down Expand Up @@ -115,6 +114,7 @@ def __getitem__(self, name):
return super().__getitem__(self._normalize_name(name))

def __init__(self, structure=HEADER_NEW, reproducible=False, *args, **kwargs):
self.init_logger(args, kwargs)
self.structure = structure
self.reproducible = reproducible
self.inodes = {}
Expand Down
4 changes: 2 additions & 2 deletions src/pycpio/cpio/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
MAX_INODES = 0xFFFFFFFF


def pad_cpio(size, align=4):
def pad_cpio(size: int, align: int = 4):
"""Pad size to align bytes."""
return ((size + align - 1) & ~(align - 1)) - size


def get_new_inode(existing_inodes):
def get_new_inode(existing_inodes: dict[str, list[int]]):
"""Get a new inode number."""
if not existing_inodes:
return 1
Expand Down
22 changes: 15 additions & 7 deletions src/pycpio/cpio/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,31 @@ def from_dir(path: Path, parent=None, relative=False, *args, **kwargs):
kwargs["name"] = str(path)

data = []
data.append(CPIOData.from_path(path=path, relative=relative, *args, **kwargs))
top_kwargs = kwargs.copy()
top_kwargs["path"] = path
top_kwargs["relative"] = relative
data.append(CPIOData.from_path(*args, **top_kwargs))
for child in path.iterdir():
child_kwargs = kwargs.copy()

if parent:
child_path = parent / child
else:
child_path = child

if relative:
kwargs["name"] = str(child_path.relative_to(relative))
child_kwargs["name"] = str(child_path.relative_to(relative))
else:
kwargs["name"] = str(child_path)
child_kwargs["name"] = str(child_path)

child_kwargs["path"] = child_path
child_kwargs["relative"] = relative

if child.is_dir() and not child.is_symlink():
data.extend(CPIOData.from_dir(path=child_path, parent=parent, relative=relative, *args, **kwargs))
child_kwargs["parent"] = parent
data.extend(CPIOData.from_dir(*args, **child_kwargs))
else:
data.append(CPIOData.from_path(path=child_path, relative=relative, *args, **kwargs))
data.append(CPIOData.from_path(*args, **child_kwargs))
return data

@staticmethod
Expand Down Expand Up @@ -92,7 +101,6 @@ def from_path(path: Path, relative=False, resolve_symlink=False, *args, **kwargs
if not path.exists():
raise ValueError("Path does not exist: %s" % path)

kwargs["path"] = path
# If a name is provided, use it, otherwise, use the path, if relative is provided, use the relative path
if name := kwargs.pop("name", None):
kwargs["name"] = name
Expand All @@ -118,7 +126,7 @@ def from_path(path: Path, relative=False, resolve_symlink=False, *args, **kwargs
kwargs["rdevminor"] = kwargs.pop("rdevminor", os.minor(path.stat(follow_symlinks=resolve_symlink).st_rdev))

header = CPIOHeader(*args, **kwargs)
data = CPIOData.get_subtype(b"", header, *args, **kwargs)
data = CPIOData.get_subtype(b"", header, path=path, *args, **kwargs)

if logger := kwargs.get("logger"):
logger.debug(f"Created CPIO entry from path: {data}")
Expand Down
13 changes: 7 additions & 6 deletions src/pycpio/header/cpioheader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
from pycpio.header.header_funcs import get_header_from_magic, get_magic_from_header
from pycpio.header.headers import HEADER_NEW
from pycpio.masks import print_permissions, resolve_mode_bytes, resolve_permissions
from zenlib.logging import loggify
from zenlib.logging import LoggerMixIn
from zenlib.util import colorize as c_


@loggify
class CPIOHeader:
class CPIOHeader(LoggerMixIn):
"""CPIO HEADER, can be initialized from a segment of header data with or without a structure definition."""

def __init__(self, header_data=b"", overrides={}, *args, **kwargs):
self.init_logger(args, kwargs)
self.overrides = overrides
if header_data:
self.logger.debug("Creating CPIOEntry from header data: %s", header_data)
Expand Down Expand Up @@ -138,8 +138,9 @@ def process_overrides(self) -> None:
if hasattr(self, attribute):
self.logger.log(5, "[%s] Pre-override: %s" % (attribute, getattr(self, attribute)))
if attribute == "mode":
mode = int(getattr(self, "mode", b"00000000"), 16)
# Mask the mode, then add the override
value = (int(self.mode, 16) & 0o7777000) | (self.overrides[attribute] & 0o777)
value = (mode & 0o7777000) | (self.overrides[attribute] & 0o777)
else:
value = self.overrides[attribute]
self.logger.debug("[%s] Setting override: %s" % (attribute, value))
Expand All @@ -148,13 +149,13 @@ def process_overrides(self) -> None:
def _read_bytes(self, num_bytes: int) -> bytes:
"""Read the specified number of bytes from the data, incrementing the offset, then returning the data."""
data = self.data[self.offset : self.offset + num_bytes]
self.logger.log(5, "Read %d bytes: %s" % (num_bytes, data))
self.logger.log(5, "Read %s bytes: %r" % (num_bytes, data))
self.offset += num_bytes
return data

def add_data(self, data: bytes) -> None:
"""Add the file data to the object."""
self.logger.debug("Adding data: %s" % data)
self.logger.debug("Adding data: %r" % data)
self.data += data

def parse_header(self):
Expand Down
4 changes: 2 additions & 2 deletions src/pycpio/header/header_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ def get_header_from_magic(magic: bytes) -> dict:
for magic, header_type in lookup_table.items():
if magic == magic:
return header_type
raise ValueError("Unknown magic number: %s" % magic)
raise ValueError("Unknown magic number: %r" % magic)


def get_magic_from_header(header: dict) -> dict:
def get_magic_from_header(header: dict) -> bytes:
"""Return the magic number for the given header format."""
for magic, header_type in lookup_table.items():
if header_type == header:
Expand Down
13 changes: 0 additions & 13 deletions src/pycpio/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,6 @@ def main():
raise ValueError("Character device requires minor number")
c.add_chardev(chardev_path, major, minor)

if append_file := kwargs.get("append"):
cmdargs = {
"relative": kwargs.get("relative"),
"path": Path(append_file),
"name": kwargs.get("name"),
"absolute": kwargs.get("absolute"),
}

c.append_cpio(**cmdargs)

if recursive_path := kwargs.get("recursive"):
cmdargs = {"relative": kwargs.get("relative"), "path": Path(recursive_path)}
c.append_recursive(**cmdargs)

if output_file := kwargs.get("output"):
compression = kwargs.get("compress")
Expand Down
12 changes: 9 additions & 3 deletions src/pycpio/masks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
from pycpio.masks.modes import CPIOModes, mode_bytes_from_path, resolve_mode_bytes
from pycpio.masks.permissions import Permissions, print_permissions, resolve_permissions
from pycpio.masks.modes import CPIOModes, resolve_mode_bytes, mode_bytes_from_path

__all__ = [Permissions, print_permissions, resolve_permissions,
CPIOModes, resolve_mode_bytes, mode_bytes_from_path]
__all__ = [
"Permissions",
"print_permissions",
"resolve_permissions",
"CPIOModes",
"resolve_mode_bytes",
"mode_bytes_from_path",
]
12 changes: 8 additions & 4 deletions src/pycpio/masks/modes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from enum import Enum
from pathlib import Path

from typing import Union

class CPIOModes(Enum):
"""
Expand All @@ -16,9 +16,11 @@ class CPIOModes(Enum):
FIFO = 0o010000 # FIFO


def resolve_mode_bytes(mode: bytes) -> CPIOModes:
def resolve_mode_bytes(mode: bytes) -> Union[CPIOModes, None]:
"""
Resolve the mode mask from the given bytes.

If the mode is 0, return None (trailer).
"""
mode_int = int(mode, 16)
# Handle the trailer
Expand All @@ -29,14 +31,16 @@ def resolve_mode_bytes(mode: bytes) -> CPIOModes:
if cpiomode.value & mode_int == cpiomode.value:
return cpiomode

raise ValueError(f"Unknown mode: {mode}")
raise ValueError("Unknown mode: %r" % mode)


def mode_bytes_from_path(file_path: Path) -> CPIOModes:
def mode_bytes_from_path(file_path: Path) -> int:
"""
Gets the mode type bytes from the given path.
The order of the checks is important,
as some types are subsets of others.

returns the integer value of the mode.
"""
if file_path.is_symlink():
return CPIOModes.Symlink.value
Expand Down
Empty file added src/pycpio/py.typed
Empty file.
16 changes: 8 additions & 8 deletions src/pycpio/pycpio.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pathlib import Path
from typing import Union

from zenlib.logging import loggify
from zenlib.logging import LoggerMixIn

from pycpio.cpio import CPIOArchive, CPIOData
from pycpio.header import HEADER_NEW
Expand All @@ -10,11 +10,11 @@
from pycpio.writer import CPIOWriter


@loggify
class PyCPIO:
class PyCPIO(LoggerMixIn):
"""A class for using CPIO archives."""

def __init__(self, structure=HEADER_NEW, reproducible=False, *args, **kwargs):
self.init_logger(args, kwargs)
self.structure = structure
self.reproducible = reproducible
self.overrides = {}
Expand All @@ -25,7 +25,7 @@ def __init__(self, structure=HEADER_NEW, reproducible=False, *args, **kwargs):
self.logger.info("[%s] Setting override: %s" % (attr, value))
self.overrides[attr] = value

def append_cpio(self, path: Path, name: str = None, *args, **kwargs):
def append_cpio(self, path: Path, name: Union[str, None] = None, *args, **kwargs):
"""Appends a file or directory to the CPIO archive."""
kwargs.update({"path": path, "structure": self.structure, "overrides": self.overrides, "logger": self.logger})
if name:
Expand All @@ -47,9 +47,9 @@ def add_symlink(self, name: str, target: str):

def add_chardev(self, name: str, major: int, minor: int, *args, **kwargs):
"""Adds a character device to the CPIO archive."""
self._build_cpio_entry(
name=name, entry_type=CPIOModes["CharDev"].value, rdevmajor=major, rdevminor=minor, *args, **kwargs
)
kwargs["name"] = name
kwargs["entry_type"] = CPIOModes["CharDev"].value
self._build_cpio_entry(rdevmajor=major, rdevminor=minor, *args, **kwargs)

def read_cpio_file(self, file_path: Path):
"""Creates a CPIOReader object and reads the file."""
Expand All @@ -67,7 +67,7 @@ def list_files(self):
"""Returns a list of files in the CPIO archive."""
return str(self.entries.list())

def _build_cpio_entry(self, name: str, entry_type: CPIOModes, data=None, *args, **kwargs):
def _build_cpio_entry(self, name: str, entry_type: int, data=None, *args, **kwargs):
"""Creates a CPIOData object and adds it to the CPIO archive."""
overrides = self.overrides.copy()
if mode := kwargs.pop("mode", None):
Expand Down
Loading