Skip to content

Commit

Permalink
Improve documentation; return copy/write status; add tests for io.
Browse files Browse the repository at this point in the history
  • Loading branch information
felixfontein committed Sep 8, 2024
1 parent 814c1e9 commit 2cbe6bc
Showing 1 changed file with 31 additions and 7 deletions.
38 changes: 31 additions & 7 deletions src/antsibull_fileutils/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def copy_file(
check_content: bool = True,
file_check_content: int = 0,
chunksize: int,
) -> None:
) -> bool:
"""
Copy content from one file to another.
Expand All @@ -33,6 +33,7 @@ async def copy_file(
:kwarg check_content: If ``True`` (default) and ``file_check_content > 0`` and the
destination file exists, first check whether source and destination are potentially equal
before actually copying,
:return: ``True`` if the file was actually copied.
"""
if check_content and file_check_content > 0:
# Check whether the destination file exists and has the same content as the source file,
Expand All @@ -48,12 +49,12 @@ async def copy_file(
async with aiofiles.open(dest_path, "rb") as f_in:
existing_content = await f_in.read()
if content_to_copy == existing_content:
return
return False
# Since we already read the contents of the file to copy, simply write it to
# the destination instead of reading it again
async with aiofiles.open(dest_path, "wb") as f_out:
await f_out.write(content_to_copy)
return
return True
except FileNotFoundError:
# Destination (or source) file does not exist
pass
Expand All @@ -62,13 +63,29 @@ async def copy_file(
async with aiofiles.open(dest_path, "wb") as f_out:
while chunk := await f_in.read(chunksize):
await f_out.write(chunk)
return True


async def write_file(
filename: StrOrBytesPath, content: str, *, file_check_content: int = 0
) -> None:
content_bytes = content.encode("utf-8")
filename: StrOrBytesPath,
content: str,
*,
file_check_content: int = 0,
encoding: str = "utf-8",
) -> bool:
"""
Write encoded content to file.
:arg filename: The filename to write to.
:arg content: The content to write to the file.
:kwarg file_check_content: If > 0 and the file exists and its size in bytes does not exceed this
value, will read the file and compare it to the encoded content before overwriting.
:return: ``True`` if the file was actually written.
"""

content_bytes = content.encode(encoding)

print(file_check_content, len(content_bytes))
if file_check_content > 0 and len(content_bytes) <= file_check_content:
# Check whether the destination file exists and has the same content as the one we want to
# write, in which case we won't overwrite the file
Expand All @@ -79,16 +96,23 @@ async def write_file(
async with aiofiles.open(filename, "rb") as f:
existing_content = await f.read()
if existing_content == content_bytes:
return
return False
except FileNotFoundError:
# Destination file does not exist
pass

async with aiofiles.open(filename, "wb") as f:
await f.write(content_bytes)
return True


async def read_file(filename: StrOrBytesPath, *, encoding: str = "utf-8") -> str:
"""
Read the file and decode its contents with the given encoding.
:arg filename: The filename to read from.
:kwarg encoding: The encoding to use.
"""
async with aiofiles.open(filename, "r", encoding=encoding) as f:
content = await f.read()

Expand Down

0 comments on commit 2cbe6bc

Please sign in to comment.