Skip to content

Commit

Permalink
Rewrite copier code to improve symlink handling and preserving metada…
Browse files Browse the repository at this point in the history
…ta (#8)

* Rewrite copier code.

* Normalize symlinks by default.
  • Loading branch information
felixfontein authored Oct 22, 2024
1 parent d614f9d commit e429f1a
Show file tree
Hide file tree
Showing 4 changed files with 385 additions and 57 deletions.
2 changes: 0 additions & 2 deletions changelogs/fragments/7-symlink.yml

This file was deleted.

6 changes: 6 additions & 0 deletions changelogs/fragments/8-copier.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
minor_changes:
- "Rewrite ``Copier`` and ``GitCopier`` so that both symlinks outside the tree and symlinks inside the tree are handled more correctly:
symlinks inside the tree are kept, while for symlinks outside the tree the content is copied. Symlinks are normalized by default,
which makes this behavior similar to ansible-core's behavior in ``ansible-galaxy collection build``.
Also copying now tries to preserve metadata
(https://github.com/ansible-community/antsibull-fileutils/pull/8)."
149 changes: 131 additions & 18 deletions src/antsibull_fileutils/copier.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,127 @@ class CopierError(Exception):
pass


def _is_internal(directory: str, link: str) -> bool:
dest = os.path.join(directory, link)
if os.path.isabs(dest):
return False

if os.path.splitdrive(dest)[0]:
return False

normpath = os.path.normpath(dest)
return not (normpath == ".." or normpath.startswith(".." + os.sep))


class _TreeCopier:
def __init__(
self,
source: StrPath,
dest: StrPath,
*,
keep_inside_symlinks: bool = True,
keep_outside_symlinks: bool = False,
normalize_links: bool = True,
log_debug: t.Callable[[str], None] | None = None,
):
"""
Initialize copy helper
"""
self._log_debug = log_debug
self.created_directories: set[str] = {".", ""}
self.source = source
self.dest = dest
self.keep_inside_symlinks = keep_inside_symlinks
self.keep_outside_symlinks = keep_outside_symlinks
self.never_keep = not (keep_inside_symlinks or keep_outside_symlinks)
self.normalize_links = normalize_links
os.mkdir(self.dest, mode=0o700)

def _do_log_debug(self, msg: str, *args: t.Any) -> None:
if self._log_debug:
self._log_debug(msg, *args)

def _copy_link(self, directory: str, full_source: str, full_dest: str) -> None:
link = os.readlink(full_source)
if self.normalize_links:
full_directory = os.path.join(self.source, directory)
link = os.path.relpath(os.path.join(full_directory, link), full_directory)

internal = False if self.never_keep else _is_internal(directory, link)
keep = self.keep_inside_symlinks if internal else self.keep_outside_symlinks
if keep:
self._do_log_debug("Copying symlink {!r} to {!r}", full_source, full_dest)
os.symlink(link, full_dest)
shutil.copystat(full_source, full_dest, follow_symlinks=False)
return

real_source = os.path.realpath(full_source)
if os.path.isdir(real_source):
self._do_log_debug(
"Copying symlinked directory tree {!r} to {!r}", full_source, full_dest
)
shutil.copytree(real_source, full_dest, symlinks=False)
return

self._do_log_debug(
"Copying symlinked file {!r} to {!r}", full_source, full_dest
)
shutil.copy2(real_source, full_dest)

def _create_dir(self, directory: str) -> None:
if directory not in self.created_directories:
src_dir = os.path.join(self.source, directory)
dest_dir = os.path.join(self.dest, directory)
self._do_log_debug("Copying directory {!r} to {!r}", src_dir, dest_dir)
os.makedirs(dest_dir, mode=0o700, exist_ok=True)
shutil.copystat(src_dir, dest_dir, follow_symlinks=False)
self.created_directories.add(directory)

def _copy_file(self, directory: str, relative_path: str) -> None:
self._create_dir(directory)

full_source = os.path.join(self.source, relative_path)
full_dest = os.path.join(self.dest, relative_path)
if os.path.islink(full_source):
self._copy_link(directory, full_source, full_dest)
else:
self._do_log_debug("Copying file {!r} to {!r}", full_source, full_dest)
shutil.copy2(full_source, full_dest)

def copy_file(
self, relative_path: str, *, ignore_non_existing: bool = False
) -> None:
if ignore_non_existing and not os.path.lexists(
os.path.join(self.source, relative_path)
):
return
directory, _ = os.path.split(relative_path)
self._copy_file(directory, relative_path)

def walk(self):
for root, dirs, files in os.walk(self.source, followlinks=False):
directory = os.path.relpath(root, self.source)
if directory == ".":
directory = ""
for file in files:
relative_path = os.path.join(directory, file)
self._copy_file(directory, relative_path)
for a_dir in dirs:
self._create_dir(os.path.join(directory, a_dir))


class Copier:
"""
Allows to copy directories.
"""

def __init__(self, *, log_debug: t.Callable[[str], None] | None = None):
def __init__(
self,
*,
normalize_links: bool = True,
log_debug: t.Callable[[str], None] | None = None,
):
self.normalize_links = normalize_links
self._log_debug = log_debug

def _do_log_debug(self, msg: str, *args: t.Any) -> None:
Expand All @@ -46,7 +161,12 @@ def copy(self, from_path: StrPath, to_path: StrPath) -> None:
self._do_log_debug(
"Copying complete directory from {!r} to {!r}", from_path, to_path
)
shutil.copytree(from_path, to_path, symlinks=True)
_TreeCopier(
from_path,
to_path,
normalize_links=self.normalize_links,
log_debug=self._log_debug,
).walk()


class GitCopier(Copier):
Expand All @@ -57,10 +177,11 @@ class GitCopier(Copier):
def __init__(
self,
*,
normalize_links: bool = True,
git_bin_path: StrPath = "git",
log_debug: t.Callable[[str], None] | None = None,
):
super().__init__(log_debug=log_debug)
super().__init__(normalize_links=normalize_links, log_debug=log_debug)
self.git_bin_path = git_bin_path

def copy(self, from_path: StrPath, to_path: StrPath) -> None:
Expand All @@ -77,25 +198,17 @@ def copy(self, from_path: StrPath, to_path: StrPath) -> None:
self._do_log_debug(
"Copying {} file(s) from {!r} to {!r}", len(files), from_path, to_path
)
os.mkdir(to_path, mode=0o700)
created_directories = set()
tc = _TreeCopier(
from_path,
to_path,
normalize_links=self.normalize_links,
log_debug=self._log_debug,
)
for file in files:
# Decode filename and check whether the file still exists
# (deleted files are part of the output)
file_decoded = file.decode("utf-8")
src_path = os.path.join(from_path, file_decoded)
if not os.path.exists(src_path):
continue

# Check whether the directory for this file exists
directory, _ = os.path.split(file_decoded)
if directory not in created_directories:
os.makedirs(os.path.join(to_path, directory), mode=0o700, exist_ok=True)
created_directories.add(directory)

# Copy the file
dst_path = os.path.join(to_path, file_decoded)
shutil.copyfile(src_path, dst_path, follow_symlinks=False)
tc.copy_file(file_decoded, ignore_non_existing=True)


class CollectionCopier:
Expand Down
Loading

0 comments on commit e429f1a

Please sign in to comment.