import functools
import io
import os.path
import re
import stat
import tarfile
from typing import Callable, Dict, Iterable, List, Optional, Tuple
from . import hashing
from .exceptions import TplBuildContextException, TplBuildException
@functools.lru_cache(maxsize=2**16)
def _hash_file(path: str) -> str:
"""
Hash the passed file, cache the result.
"""
hsh = hashing.HASHER()
with open(path, "rb") as fdata:
while data := fdata.read(2**16):
hsh.update(data)
return hsh.hexdigest()
def _create_pattern_part(
path_pat: str, *, allow_double_star: bool = True
) -> Tuple[str, bool]:
"""
Returns (regex_pattern, simple) tuple where `regex_pattern` is a regex that
recognizes the passed pattern and `simple` indicates if the pattern is a
"simple" pattern, i.e. matching only a single literal.
`path_pat` is to be interpretted as described in
https://pkg.go.dev/path/filepath#Match except that "**" should match any
number of directories.
Raises a ValueError if path_pat is malformed.
"""
assert os.path.sep not in path_pat
if path_pat == "**" and allow_double_star:
# Match any number of directories
return ".*", False
result = [re.escape(os.path.sep)]
simple = True
i = 0
while i < len(path_pat):
ch = path_pat[i]
i += 1
if ch == "\\":
if i == len(path_pat):
raise ValueError("Trailing escape character")
result.append(re.escape(path_pat[i]))
i += 1
elif ch in "*?":
simple = False
result.append(f"[^{os.path.sep}]{ch}")
elif ch == "[":
simple = False
range_start = None
cclass_empty = True
char_avail = False
result.append("[")
# Check for character class negation
if i < len(path_pat) and path_pat[i] == "^":
result.append("^")
i += 1
while True:
if i == len(path_pat):
raise ValueError("Unclosed character class")
ch = path_pat[i]
i += 1
if ch == "\\":
if i == len(path_pat):
raise ValueError("Trailing escape character")
ch = path_pat[i]
i += 1
elif ch == "]":
if range_start is not None:
raise ValueError("Unclosed character range")
if cclass_empty:
raise ValueError("Empty character class")
break
elif ch == "-":
if not char_avail:
raise ValueError("Unexpected '-' in character class")
range_start = result[-1]
result.append("-")
char_avail = False
continue
if range_start is not None:
if ord(range_start) > ord(ch):
raise ValueError("Invalid character range")
range_start = None
else:
char_avail = True
result.append(re.escape(ch))
cclass_empty = False
result.append("]")
else:
result.append(re.escape(ch))
return "".join(result), simple
def _create_pattern(
path_pat: str, match_prefix: bool, *, allow_double_star: bool = True
) -> str:
"""
Compile a full path pattern with separators into a regex.
If `match_prefix` is True, and all but the last component of `path_pat`
is simple, then any path that matches a prefix of path components will also
be matched by this pattern. For example "a/b/c/*.txt" will match "a", "a/b",
and "a/b/c". If the pattern was instead "a/*/c/*.txt" then no prefix matching
will happen at all because the second component is not simple.
"""
pattern_parts = [
_create_pattern_part(path_part, allow_double_star=allow_double_star)
for path_part in path_pat.split(os.path.sep)
]
if not match_prefix or not all(simple for _, simple in pattern_parts[:-1]):
return (
"^"
+ "".join(pat for pat, _ in pattern_parts)
+ f"(?:$|{re.escape(os.path.sep)})"
)
result = ["^"]
for pat_part, _ in pattern_parts:
result.append(pat_part)
result.append("(?:$|")
result.append(re.escape(os.path.sep))
result.append(")" * len(pattern_parts))
return "".join(result)
[docs]class ContextPattern:
"""
Represents a pattern used to control what files are availble in a
build context.
Attributes:
ignoring (bool): Flag indicating if matching this pattern means the
matched element should be ignored or not ignored.
"""
def __init__(self, pattern: str):
try:
if pattern.startswith("!"):
self.ignoring = False
self.pattern = re.compile(_create_pattern(pattern[1:], True))
else:
self.ignoring = True
self.pattern = re.compile(_create_pattern(pattern, False))
except ValueError as exc:
raise TplBuildContextException(
f"Error handling {repr(pattern)}: {exc}"
) from exc
[docs] def matches(self, path: str) -> bool:
"""Returns True if this pattern matches the path"""
return bool(self.pattern.search(path))
def _apply_umask(mode: int, umask: Optional[int]) -> int:
"""
Copy the user permission bits to group and all bits and then apply
the supplied umask. If umask is None just return mode instead.
"""
if umask is None:
return mode
umode = (mode >> 6) & 0o7
mode &= ~0o777
mode |= ((umode << 6) | (umode << 3) | umode) & ~umask
return mode
def _stat_to_tarinfo(
base_path: str, arch_path: str, *, umask: Optional[int] = None, follow_link=True
) -> tarfile.TarInfo:
"""
Convert a stat_result into a TarInfo structure.
"""
tarinfo = tarfile.TarInfo()
if follow_link:
statres = os.stat(os.path.join(base_path, arch_path))
else:
statres = os.lstat(os.path.join(base_path, arch_path))
linkname = ""
stmd = statres.st_mode
if stat.S_ISREG(stmd):
typ = tarfile.REGTYPE
elif stat.S_ISDIR(stmd):
typ = tarfile.DIRTYPE
elif stat.S_ISFIFO(stmd):
typ = tarfile.FIFOTYPE
elif stat.S_ISLNK(stmd):
typ = tarfile.SYMTYPE
linkname = os.readlink(os.path.join(base_path, arch_path))
elif stat.S_ISCHR(stmd):
typ = tarfile.CHRTYPE
elif stat.S_ISBLK(stmd):
typ = tarfile.BLKTYPE
else:
raise TplBuildException("Unsupported file mode in context")
if arch_path == ".":
tarinfo.name = "/"
elif arch_path.startswith("./"):
tarinfo.name = arch_path[1:]
else:
tarinfo.name = "/" + arch_path
tarinfo.mode = _apply_umask(stmd, umask)
tarinfo.uid = 0
tarinfo.gid = 0
tarinfo.uname = "root"
tarinfo.gname = "root"
if typ == tarfile.REGTYPE:
tarinfo.size = statres.st_size
else:
tarinfo.size = 0
tarinfo.mtime = 0
tarinfo.type = typ
tarinfo.linkname = linkname
if typ in (tarfile.CHRTYPE, tarfile.BLKTYPE):
if hasattr(os, "major") and hasattr(os, "minor"):
tarinfo.devmajor = os.major(statres.st_rdev)
tarinfo.devminor = os.minor(statres.st_rdev)
return tarinfo
[docs]class BuildContext:
"""
Class representing and capable of writing a build context.
Args:
base_dir: The base directory of the build context
umask: If not None, the user permission bits will be copied to the
'group' and 'all' bits and then the umask will be applied. If
None then the exact file permissions will be forwarded to the
build context.
ignore_patterns: An interable of ignore patterns in the order they
should be tested. A path will be ignored if the last pattern it
matches in the list is not negated. This is meant to mirror the
behavior and semantics of
https://docs.docker.com/engine/reference/builder/#dockerignore-file
"""
def __init__(
self,
base_dir: Optional[str],
umask: Optional[int],
ignore_patterns: Iterable[str],
) -> None:
self.base_dir = base_dir
self.umask = umask
self.context_patterns = tuple(
ContextPattern(pattern.strip())
for pattern in ignore_patterns
if pattern.strip() and pattern.strip()[0] != "#"
)
[docs] def ignored(self, path: str):
"""
Returns True if the given path should be ignored (not present) in
the build contxt. `path` should start with a directory separator
and should be relative to `self.base_dir`.
"""
ignored = False
for pattern in self.context_patterns:
if pattern.ignoring == ignored:
continue
if pattern.matches(path):
ignored = pattern.ignoring
return ignored
[docs] def walk_context(
self,
*,
extra_files: Optional[Dict[str, Tuple[int, bytes]]] = None,
ignore_func: Optional[Callable[[str], bool]] = None,
) -> Iterable[tarfile.TarInfo]:
"""
Generator that yields TarInfo objects for each not-ignored object
in the context. Objects are yielded in a deterministic order based on the
names.
"""
def _is_ignored(path: str) -> bool:
if self.ignored(path):
return True
return ignore_func is not None and ignore_func(path)
if self.base_dir is None:
tarinfo = tarfile.TarInfo("/")
tarinfo.mode = _apply_umask(0o777, self.umask)
tarinfo.type = tarfile.DIRTYPE
yield tarinfo
else:
for root, dir_names, file_names in os.walk(self.base_dir):
arch_root = os.path.relpath(root, self.base_dir)
tarinfo = _stat_to_tarinfo(self.base_dir, arch_root, umask=self.umask)
yield tarinfo
file_names[:] = sorted(
file_name
for file_name in file_names
if not _is_ignored(os.path.join(tarinfo.name, file_name))
)
dir_names[:] = sorted(
dir_name
for dir_name in dir_names
if not _is_ignored(os.path.join(tarinfo.name, dir_name))
)
for file_name in file_names:
yield _stat_to_tarinfo(
self.base_dir,
os.path.join(arch_root, file_name),
umask=self.umask,
follow_link=False,
)
extra_files = extra_files or {}
for file_name, (file_mode, file_data) in extra_files.items():
tarinfo = tarfile.TarInfo(file_name)
tarinfo.mode = _apply_umask(file_mode, self.umask)
tarinfo.size = len(file_data)
yield tarinfo
[docs] def write_context(
self,
io_out: io.BytesIO,
*,
extra_files: Optional[Dict[str, Tuple[int, bytes]]] = None,
compress: bool = False,
) -> None:
"""
Write the context to `io_out`.
Args:
io_out: The file-like object to write the build context to as a tar file.
extra_files: Extra file data to add at the root of the archive.
this is of the form (file mode, file data).
compress: If set the output stream will be gzipped.
"""
extra_files = extra_files or {}
with tarfile.open(
fileobj=io_out,
format=tarfile.PAX_FORMAT,
mode=("w|gz" if compress else "w|"),
) as tf:
for tarinfo in self.walk_context(extra_files=extra_files):
if tarinfo.type == tarfile.REGTYPE:
extra_data = extra_files.get(tarinfo.name)
if extra_data:
tf.addfile(tarinfo, fileobj=io.BytesIO(extra_data[1]))
else:
assert self.base_dir is not None
with open(
os.path.join(self.base_dir, "." + tarinfo.name), "rb"
) as fileobj:
tf.addfile(tarinfo, fileobj=fileobj)
else:
tf.addfile(tarinfo)
[docs] def compute_partial_hash(
self,
*,
patterns: Optional[List[str]] = None,
) -> str:
"""
Compute a partial hash of the context where all hashed files must match
at least one file pattern in `patterns`.
"""
pats = [
re.compile(_create_pattern(pat, True, allow_double_star=False))
for pat in (patterns or [])
]
def _ignore_func(path: str) -> bool:
return not any(pat.search(path) for pat in pats)
hsh = hashing.HASHER()
for tarinfo in self.walk_context(
ignore_func=_ignore_func if patterns else None
):
info = tarinfo.get_info()
info["type"] = info["type"].decode("utf-8") # type: ignore
hsh.update(hashing.json_hash(info).encode("utf-8"))
if tarinfo.type == tarfile.REGTYPE:
assert self.base_dir is not None
hsh.update(
_hash_file(os.path.join(self.base_dir, "." + tarinfo.name)).encode(
"utf-8"
)
)
return hashing.json_hash(
[
type(self).__name__,
"full",
hsh.hexdigest(),
]
)
@functools.cached_property
def full_hash(self) -> str:
"""The full content hash of the build context, as a hex digest"""
return self.compute_partial_hash()
@functools.cached_property
def symbolic_hash(self) -> str:
"""
The symbolic content hash of the build context, as a hex digest. This
is different from :attr:`full_hash` in that it does not read any files
from the build context and is only a hash of the parameters that define
the build context instead.
"""
return hashing.json_hash(
[
type(self).__name__,
"symbolic",
self.umask,
self.base_dir,
[[pat.ignoring, pat.pattern.pattern] for pat in self.context_patterns],
]
)