diff --git a/changelogs/fragments/7-symlink.yml b/changelogs/fragments/7-symlink.yml deleted file mode 100644 index 5222adc..0000000 --- a/changelogs/fragments/7-symlink.yml +++ /dev/null @@ -1,2 +0,0 @@ -bugfixes: - - "Make ``GitCopier`` handle symlinks correctly (https://github.com/ansible-community/antsibull-fileutils/pull/7)." diff --git a/changelogs/fragments/8-copier.yml b/changelogs/fragments/8-copier.yml new file mode 100644 index 0000000..dbaadef --- /dev/null +++ b/changelogs/fragments/8-copier.yml @@ -0,0 +1,6 @@ +minor_changes: + - "Rewrite ``Copier`` and ``GitCopier`` so that both symlinks outside the tree and symlinks inside the tree are handled more correctly: + symlinks inside the tree are kept, while for symlinks outside the tree the content is copied. Symlinks are normalized by default, + which makes this behavior similar to ansible-core's behavior in ``ansible-galaxy collection build``. + Also copying now tries to preserve metadata + (https://github.com/ansible-community/antsibull-fileutils/pull/8)." diff --git a/src/antsibull_fileutils/copier.py b/src/antsibull_fileutils/copier.py index 74a26ad..4b2a8bd 100644 --- a/src/antsibull_fileutils/copier.py +++ b/src/antsibull_fileutils/copier.py @@ -25,12 +25,127 @@ class CopierError(Exception): pass +def _is_internal(directory: str, link: str) -> bool: + dest = os.path.join(directory, link) + if os.path.isabs(dest): + return False + + if os.path.splitdrive(dest)[0]: + return False + + normpath = os.path.normpath(dest) + return not (normpath == ".." or normpath.startswith(".." + os.sep)) + + +class _TreeCopier: + def __init__( + self, + source: StrPath, + dest: StrPath, + *, + keep_inside_symlinks: bool = True, + keep_outside_symlinks: bool = False, + normalize_links: bool = True, + log_debug: t.Callable[[str], None] | None = None, + ): + """ + Initialize copy helper + """ + self._log_debug = log_debug + self.created_directories: set[str] = {".", ""} + self.source = source + self.dest = dest + self.keep_inside_symlinks = keep_inside_symlinks + self.keep_outside_symlinks = keep_outside_symlinks + self.never_keep = not (keep_inside_symlinks or keep_outside_symlinks) + self.normalize_links = normalize_links + os.mkdir(self.dest, mode=0o700) + + def _do_log_debug(self, msg: str, *args: t.Any) -> None: + if self._log_debug: + self._log_debug(msg, *args) + + def _copy_link(self, directory: str, full_source: str, full_dest: str) -> None: + link = os.readlink(full_source) + if self.normalize_links: + full_directory = os.path.join(self.source, directory) + link = os.path.relpath(os.path.join(full_directory, link), full_directory) + + internal = False if self.never_keep else _is_internal(directory, link) + keep = self.keep_inside_symlinks if internal else self.keep_outside_symlinks + if keep: + self._do_log_debug("Copying symlink {!r} to {!r}", full_source, full_dest) + os.symlink(link, full_dest) + shutil.copystat(full_source, full_dest, follow_symlinks=False) + return + + real_source = os.path.realpath(full_source) + if os.path.isdir(real_source): + self._do_log_debug( + "Copying symlinked directory tree {!r} to {!r}", full_source, full_dest + ) + shutil.copytree(real_source, full_dest, symlinks=False) + return + + self._do_log_debug( + "Copying symlinked file {!r} to {!r}", full_source, full_dest + ) + shutil.copy2(real_source, full_dest) + + def _create_dir(self, directory: str) -> None: + if directory not in self.created_directories: + src_dir = os.path.join(self.source, directory) + dest_dir = os.path.join(self.dest, directory) + self._do_log_debug("Copying directory {!r} to {!r}", src_dir, dest_dir) + os.makedirs(dest_dir, mode=0o700, exist_ok=True) + shutil.copystat(src_dir, dest_dir, follow_symlinks=False) + self.created_directories.add(directory) + + def _copy_file(self, directory: str, relative_path: str) -> None: + self._create_dir(directory) + + full_source = os.path.join(self.source, relative_path) + full_dest = os.path.join(self.dest, relative_path) + if os.path.islink(full_source): + self._copy_link(directory, full_source, full_dest) + else: + self._do_log_debug("Copying file {!r} to {!r}", full_source, full_dest) + shutil.copy2(full_source, full_dest) + + def copy_file( + self, relative_path: str, *, ignore_non_existing: bool = False + ) -> None: + if ignore_non_existing and not os.path.lexists( + os.path.join(self.source, relative_path) + ): + return + directory, _ = os.path.split(relative_path) + self._copy_file(directory, relative_path) + + def walk(self): + for root, dirs, files in os.walk(self.source, followlinks=False): + directory = os.path.relpath(root, self.source) + if directory == ".": + directory = "" + for file in files: + relative_path = os.path.join(directory, file) + self._copy_file(directory, relative_path) + for a_dir in dirs: + self._create_dir(os.path.join(directory, a_dir)) + + class Copier: """ Allows to copy directories. """ - def __init__(self, *, log_debug: t.Callable[[str], None] | None = None): + def __init__( + self, + *, + normalize_links: bool = True, + log_debug: t.Callable[[str], None] | None = None, + ): + self.normalize_links = normalize_links self._log_debug = log_debug def _do_log_debug(self, msg: str, *args: t.Any) -> None: @@ -46,7 +161,12 @@ def copy(self, from_path: StrPath, to_path: StrPath) -> None: self._do_log_debug( "Copying complete directory from {!r} to {!r}", from_path, to_path ) - shutil.copytree(from_path, to_path, symlinks=True) + _TreeCopier( + from_path, + to_path, + normalize_links=self.normalize_links, + log_debug=self._log_debug, + ).walk() class GitCopier(Copier): @@ -57,10 +177,11 @@ class GitCopier(Copier): def __init__( self, *, + normalize_links: bool = True, git_bin_path: StrPath = "git", log_debug: t.Callable[[str], None] | None = None, ): - super().__init__(log_debug=log_debug) + super().__init__(normalize_links=normalize_links, log_debug=log_debug) self.git_bin_path = git_bin_path def copy(self, from_path: StrPath, to_path: StrPath) -> None: @@ -77,25 +198,17 @@ def copy(self, from_path: StrPath, to_path: StrPath) -> None: self._do_log_debug( "Copying {} file(s) from {!r} to {!r}", len(files), from_path, to_path ) - os.mkdir(to_path, mode=0o700) - created_directories = set() + tc = _TreeCopier( + from_path, + to_path, + normalize_links=self.normalize_links, + log_debug=self._log_debug, + ) for file in files: # Decode filename and check whether the file still exists # (deleted files are part of the output) file_decoded = file.decode("utf-8") - src_path = os.path.join(from_path, file_decoded) - if not os.path.exists(src_path): - continue - - # Check whether the directory for this file exists - directory, _ = os.path.split(file_decoded) - if directory not in created_directories: - os.makedirs(os.path.join(to_path, directory), mode=0o700, exist_ok=True) - created_directories.add(directory) - - # Copy the file - dst_path = os.path.join(to_path, file_decoded) - shutil.copyfile(src_path, dst_path, follow_symlinks=False) + tc.copy_file(file_decoded, ignore_non_existing=True) class CollectionCopier: diff --git a/tests/units/test_copier.py b/tests/units/test_copier.py index 5a74371..ccc8697 100644 --- a/tests/units/test_copier.py +++ b/tests/units/test_copier.py @@ -16,16 +16,45 @@ import pytest -from antsibull_fileutils.copier import CollectionCopier, Copier, CopierError, GitCopier +from antsibull_fileutils.copier import ( + CollectionCopier, + Copier, + CopierError, + GitCopier, + _is_internal, +) from .utils import collect_log -def assert_same(a: pathlib.Path, b: pathlib.Path) -> None: +@pytest.mark.parametrize( + "directory, link, expected", + [ + ("", "foo", True), + ("", "foo/bar", True), + ("", "foo/../bar", True), + ("", "foo/../../bar", False), + ("", "..", False), + ("", "../foo/bar", False), + ("foo", ".", True), + ("foo", "bar", True), + ("foo", "../bar", True), + ("foo", "../../bar", False), + ("foo", "..", True), + ], +) +def test__is_internal(directory, link, expected): + assert _is_internal(directory, link) == expected + + +def assert_same( + a: pathlib.Path, b: pathlib.Path, *, changed_link: str | None = None +) -> None: if a.is_symlink(): assert b.is_symlink() - assert a.readlink() == b.readlink() + assert (changed_link or a.readlink()) == b.readlink() return + assert not b.is_symlink() if a.is_file(): assert b.is_file() assert a.read_bytes() == b.read_bytes() @@ -35,55 +64,131 @@ def assert_same(a: pathlib.Path, b: pathlib.Path) -> None: return +def assert_same_recursively(a: pathlib.Path, b: pathlib.Path) -> None: + assert a.is_dir() + assert b.is_dir() + dest = {f.name: f for f in b.iterdir()} + for f in a.iterdir(): + f_other = dest.pop(f.name) + assert_same(f, f_other) + if f.is_dir(): + assert_same_recursively(f, f_other) + assert dest == {} + + def test_copier(tmp_path_factory): directory: pathlib.Path = tmp_path_factory.mktemp("copier") src_dir = directory / "src" - dest_dir = directory / "dest" + src_dir.mkdir() + (src_dir / "empty").touch() + (src_dir / "link").symlink_to("empty") + (src_dir / "dir").mkdir() + (src_dir / "file").write_text("content", encoding="utf-8") + (src_dir / "dir" / "binary_file").write_bytes(b"\x00\x01\x02") + (src_dir / "dir" / "another_file").write_text("more", encoding="utf-8") - with mock.patch( - "antsibull_fileutils.copier.shutil.copytree", - return_value=None, - ) as m: - copier = Copier() - copier.copy(str(src_dir), str(dest_dir)) - m.assert_called_with(str(src_dir), str(dest_dir), symlinks=True) + dest_dir = directory / "dest1" + copier = Copier() + copier.copy(str(src_dir), str(dest_dir)) + assert_same_recursively(src_dir, dest_dir) + + def src_dst(*appendix): + s = src_dir + d = dest_dir + for a in appendix: + s = s / a + d = d / a + return str(s), str(d) - with mock.patch( - "antsibull_fileutils.copier.shutil.copytree", - return_value=None, - ) as m: - kwargs, debug, info = collect_log(with_info=False) - copier = Copier(**kwargs) - copier.copy(str(src_dir), str(dest_dir)) - m.assert_called_with(str(src_dir), str(dest_dir), symlinks=True) - assert debug == [ - ( - "Copying complete directory from {!r} to {!r}", - (str(src_dir), str(dest_dir)), - ), - ] + dest_dir = directory / "dest2" + kwargs, debug, info = collect_log(with_info=False) + copier = Copier(**kwargs) + copier.copy(str(src_dir), str(dest_dir)) + assert_same_recursively(src_dir, dest_dir) + assert sorted(debug) == [ + ( + "Copying complete directory from {!r} to {!r}", + (str(src_dir), str(dest_dir)), + ), + ( + "Copying directory {!r} to {!r}", + src_dst("dir"), + ), + ( + "Copying file {!r} to {!r}", + src_dst("dir", "another_file"), + ), + ( + "Copying file {!r} to {!r}", + src_dst("dir", "binary_file"), + ), + ( + "Copying file {!r} to {!r}", + src_dst("empty"), + ), + ( + "Copying file {!r} to {!r}", + src_dst("file"), + ), + ( + "Copying symlink {!r} to {!r}", + src_dst("link"), + ), + ] - with mock.patch( - "antsibull_fileutils.copier.shutil.copytree", - return_value=None, - ) as m: - kwargs, debug, info = collect_log(with_info=False) - copier = Copier(**kwargs) - copier.copy(src_dir, dest_dir) - m.assert_called_with(src_dir, dest_dir, symlinks=True) - assert debug == [ - ("Copying complete directory from {!r} to {!r}", (src_dir, dest_dir)), - ] + dest_dir = directory / "dest3" + kwargs, debug, info = collect_log(with_info=False) + copier = Copier(**kwargs) + copier.copy(src_dir, dest_dir) + assert_same_recursively(src_dir, dest_dir) + assert sorted(debug) == [ + ("Copying complete directory from {!r} to {!r}", (src_dir, dest_dir)), + ( + "Copying directory {!r} to {!r}", + src_dst("dir"), + ), + ( + "Copying file {!r} to {!r}", + src_dst("dir", "another_file"), + ), + ( + "Copying file {!r} to {!r}", + src_dst("dir", "binary_file"), + ), + ( + "Copying file {!r} to {!r}", + src_dst("empty"), + ), + ( + "Copying file {!r} to {!r}", + src_dst("file"), + ), + ( + "Copying symlink {!r} to {!r}", + src_dst("link"), + ), + ] def test_git_copier(tmp_path_factory): directory: pathlib.Path = tmp_path_factory.mktemp("git-copier") + (directory / "other").write_bytes(b"other") + (directory / "other_dir").mkdir() + (directory / "other_dir" / "file").write_bytes(b"foo") + src_dir = directory / "src" src_dir.mkdir() (src_dir / "empty").touch() (src_dir / "link").symlink_to("empty") + (src_dir / "link_dir").symlink_to("dir") + (src_dir / "trick_link").symlink_to("../src/empty") + (src_dir / "out_link").symlink_to("../other") + (src_dir / "out_link_dir").symlink_to("../other_dir") + (src_dir / "abs_link").symlink_to(str((src_dir / "link").resolve())) + (src_dir / "dead_link").symlink_to("does-not-exist") + (src_dir / "out_dead_link").symlink_to("../does-not-exist") (src_dir / "dir").mkdir() (src_dir / "file").write_text("content", encoding="utf-8") (src_dir / "dir" / "binary_file").write_bytes(b"\x00\x01\x02") @@ -101,6 +206,14 @@ def test_git_copier(tmp_path_factory): assert {p.name for p in dest_dir.iterdir()} == {"file"} assert_same(src_dir / "file", dest_dir / "file") + def src_dst(*appendix): + s = src_dir + d = dest_dir + for a in appendix: + s = s / a + d = d / a + return str(s), str(d) + dest_dir = directory / "dest2" with mock.patch( "antsibull_fileutils.copier.list_git_files", @@ -114,6 +227,10 @@ def test_git_copier(tmp_path_factory): assert debug == [ ("Identifying files not ignored by Git in {!r}", (src_dir,)), ("Copying {} file(s) from {!r} to {!r}", (4, src_dir, dest_dir)), + ("Copying symlink {!r} to {!r}", src_dst("link")), + ("Copying directory {!r} to {!r}", src_dst("dir")), + ("Copying file {!r} to {!r}", src_dst("dir", "binary_file")), + ("Copying file {!r} to {!r}", src_dst("dir", "another_file")), ] assert dest_dir.is_dir() assert {p.name for p in dest_dir.iterdir()} == {"link", "dir"} @@ -126,7 +243,89 @@ def test_git_copier(tmp_path_factory): assert_same(src_dir / "dir" / "another_file", dest_dir / "dir" / "another_file") assert_same(src_dir / "dir" / "binary_file", dest_dir / "dir" / "binary_file") - dest_dir = directory / "dest2" + dest_dir = directory / "dest3" + with mock.patch( + "antsibull_fileutils.copier.list_git_files", + return_value=[ + b"link_dir", + b"trick_link", + b"out_link", + b"out_link_dir", + b"abs_link", + b"dead_link", + ], + ) as m: + copier = GitCopier(git_bin_path="/path/to/git", normalize_links=False) + copier.copy(str(src_dir), str(dest_dir)) + m.assert_called_with(str(src_dir), git_bin_path="/path/to/git", log_debug=None) + assert dest_dir.is_dir() + assert {p.name for p in dest_dir.iterdir()} == { + "link_dir", + "trick_link", + "out_link", + "out_link_dir", + "abs_link", + "dead_link", + } + assert_same(src_dir / "link_dir", dest_dir / "link_dir") + assert_same((src_dir / "trick_link").resolve(), dest_dir / "trick_link") + assert_same((src_dir / "out_link").resolve(), dest_dir / "out_link") + assert_same_recursively( + (src_dir / "out_link_dir").resolve(), dest_dir / "out_link_dir" + ) + assert_same((src_dir / "abs_link").resolve(), dest_dir / "abs_link") + assert_same(src_dir / "dead_dir", dest_dir / "dead_dir") + + dest_dir = directory / "dest4" + with mock.patch( + "antsibull_fileutils.copier.list_git_files", + return_value=[ + b"link_dir", + b"trick_link", + b"out_link", + b"out_link_dir", + b"abs_link", + b"dead_link", + ], + ) as m: + copier = GitCopier(git_bin_path="/path/to/git") + copier.copy(str(src_dir), str(dest_dir)) + m.assert_called_with(str(src_dir), git_bin_path="/path/to/git", log_debug=None) + assert dest_dir.is_dir() + assert {p.name for p in dest_dir.iterdir()} == { + "link_dir", + "trick_link", + "out_link", + "out_link_dir", + "abs_link", + "dead_link", + } + assert_same(src_dir / "link_dir", dest_dir / "link_dir") + assert_same( + src_dir / "trick_link", + dest_dir / "trick_link", + changed_link=pathlib.Path("empty"), + ) + assert_same((src_dir / "out_link").resolve(), dest_dir / "out_link") + assert_same_recursively( + (src_dir / "out_link_dir").resolve(), dest_dir / "out_link_dir" + ) + assert_same( + src_dir / "abs_link", + dest_dir / "abs_link", + changed_link=pathlib.Path("empty"), + ) + assert_same(src_dir / "dead_dir", dest_dir / "dead_dir") + + def src_dst(*appendix): + s = src_dir + d = dest_dir + for a in appendix: + s = s / a + d = d / a + return str(s), str(d) + + dest_dir = directory / "dest5" with mock.patch( "antsibull_fileutils.copier.list_git_files", side_effect=ValueError("nada"), @@ -139,6 +338,18 @@ def test_git_copier(tmp_path_factory): copier.copy(str(src_dir), str(dest_dir)) m.assert_called_with(str(src_dir), git_bin_path="/path/to/git", log_debug=None) + dest_dir = directory / "dest6" + with mock.patch( + "antsibull_fileutils.copier.list_git_files", + return_value=[ + b"out_dead_link", + ], + ) as m: + copier = GitCopier(git_bin_path="/path/to/git") + with pytest.raises(FileNotFoundError) as exc: + copier.copy(str(src_dir), str(dest_dir)) + m.assert_called_with(str(src_dir), git_bin_path="/path/to/git", log_debug=None) + def test_collection_copier(tmp_path_factory): src_dir: pathlib.Path = tmp_path_factory.mktemp("collection-copier")