Skip to content

Commit

Permalink
Lz4 for barman cloud backup/restore wal archive/restore
Browse files Browse the repository at this point in the history
  • Loading branch information
mikewallace1979 committed Sep 26, 2023
1 parent a5ade28 commit 366847c
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 1 deletion.
7 changes: 7 additions & 0 deletions barman/clients/cloud_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,13 @@ def parse_arguments(args=None):
const="zstd",
dest="compression",
)
compression.add_argument(
"--lz4",
help="lz4-compress the backup while uploading to the cloud ",
action="store_const",
const="lz4",
dest="compression",
)
parser.add_argument(
"-h",
"--host",
Expand Down
69 changes: 68 additions & 1 deletion barman/clients/cloud_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ def _try_import_zstandard():
return zstandard


def _try_import_lz4():
try:
import lz4.frame
except ImportError:
raise SystemExit("Missing required python module: lz4")
return lz4.frame


class ChunkedCompressor(with_metaclass(ABCMeta, object)):
"""
Base class for all ChunkedCompressors
Expand Down Expand Up @@ -147,6 +155,53 @@ def decompress(self, data):
return self.decompressor.decompress(data)


class LZ4Compressor(ChunkedCompressor):
"""
A ChunkedCompressor implementation based on lz4
"""

def __init__(self):
self.lz4_frame = _try_import_lz4()
self.compressor = None
self.decompressor = None

def add_chunk(self, data):
"""
Compresses the supplied data and returns all the compressed bytes.
:param bytes data: The chunk of data to be compressed
:return: The compressed data
:rtype: bytes
"""
compressed = b""
if self.compressor is None:
self.compressor = self.lz4_frame.LZ4FrameCompressor()
compressed += self.compressor.begin()
compressed += self.compressor.compress(data)
return compressed

def flush(self):
return b""

def finish(self):
# flushing an lz4 compressor also renders it finished, so we flush in self.finish
# and don't flush in self.flush
return self.compressor.flush()

def decompress(self, data):
"""
Decompresses the supplied chunk of data and returns at least part of the
uncompressed data.
:param bytes data: The chunk of data to be decompressed
:return: The decompressed data
:rtype: bytes
"""
if self.decompressor is None:
self.decompressor = self.lz4_frame.LZ4FrameDecompressor()
return self.decompressor.decompress(data)


def get_compressor(compression):
"""
Helper function which returns a ChunkedCompressor for the specified compression
Expand All @@ -163,6 +218,8 @@ def get_compressor(compression):
return SnappyCompressor()
elif compression == "zstd":
return ZstdCompressor()
elif compression == "lz4":
return LZ4Compressor()
return None


Expand All @@ -188,6 +245,13 @@ def compress(wal_file, compression):
zstandard.ZstdCompressor().copy_stream(wal_file, in_mem_zstd)
in_mem_zstd.seek(0)
return in_mem_zstd
elif compression == "lz4":
in_mem_lz4 = BytesIO()
lz4_frame = _try_import_lz4()
with lz4_frame.LZ4FrameFile(in_mem_lz4, mode="wb") as compressed:
shutil.copyfileobj(wal_file, compressed)
in_mem_lz4.seek(0)
return in_mem_lz4
elif compression == "gzip":
# Create a BytesIO for in memory compression
in_mem_gzip = BytesIO()
Expand Down Expand Up @@ -218,7 +282,7 @@ def get_streaming_tar_mode(mode, compression):
:return: The full filemode for a streaming tar file
:rtype: str
"""
if compression == "snappy" or compression == "zstd" or compression is None:
if compression in ("snappy", "zstd", "lz4") or compression is None:
return "%s|" % mode
else:
return "%s|%s" % (mode, compression)
Expand All @@ -244,6 +308,9 @@ def decompress_to_file(blob, dest_file, compression):
zstandard = _try_import_zstandard()
zstandard.ZstdDecompressor().copy_stream(blob, dest_file)
return
if compression == "lz4":
lz4_frame = _try_import_lz4()
source_file = lz4_frame.LZ4FrameFile(blob, mode="rb")
elif compression == "gzip":
source_file = gzip.GzipFile(fileobj=blob, mode="rb")
elif compression == "bzip2":
Expand Down
11 changes: 11 additions & 0 deletions barman/clients/cloud_walarchive.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ def parse_arguments(args=None):
const="zstd",
dest="compression",
)
compression.add_argument(
"--lz4",
help="lz4-compress the WAL while uploading to the cloud "
"(requires optional lz4 library)",
action="store_const",
const="lz4",
dest="compression",
)
add_tag_argument(
parser,
name="tags",
Expand Down Expand Up @@ -330,6 +338,9 @@ def retrieve_wal_name(self, wal_path):
elif self.compression == "zstd":
# add zst extension
return "%s.zst" % wal_name
elif self.compression == "lz4":
# add lz4 extension
return "%s.lz4" % wal_name
else:
raise ValueError("Unknown compression type: %s" % self.compression)

Expand Down
5 changes: 5 additions & 0 deletions barman/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
".bz2": "bzip2",
".snappy": "snappy",
".zst": "zstd",
".lz4": "lz4",
}

DEFAULT_DELIMITER = "/"
Expand Down Expand Up @@ -324,6 +325,8 @@ def _build_dest_name(self, name, count=0):
components.append(".snappy")
elif self.compression == "zstd":
components.append(".zst")
elif self.compression == "lz4":
components.append(".lz4")
return "".join(components)

def _get_tar(self, name):
Expand Down Expand Up @@ -2234,6 +2237,8 @@ def get_backup_files(self, backup_info, allow_missing=False):
info.compression = "snappy"
elif ext == "tar.zst":
info.compression = "zstd"
elif ext == "tar.lz4":
info.compression = "lz4"
else:
logging.warning("Skipping unknown extension: %s", ext)
continue
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@
"google-cloud-compute", # requires minimum python3.7
],
"zstd": ["zstandard"],
"lz4": ["lz4"],
},
platforms=["Linux", "Mac OS X"],
classifiers=[
Expand Down

0 comments on commit 366847c

Please sign in to comment.