Skip to content

Commit

Permalink
Merge pull request #13 from E-ARK-Software/feat/pydantic-types
Browse files Browse the repository at this point in the history
FEAT: Pydantic types
  • Loading branch information
carlwilson authored Feb 16, 2024
2 parents 59cd0bd + 8d4b6ac commit da7c45e
Show file tree
Hide file tree
Showing 37 changed files with 1,293 additions and 745 deletions.
42 changes: 25 additions & 17 deletions eark_validator/cli/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
Command line validation application
"""
import argparse
from pprint import pprint
import os.path
from pathlib import Path
import sys
from typing import Optional, Tuple
from eark_validator.model import StructResults

import eark_validator.structure as STRUCT
import eark_validator.packages as PACKAGES
from eark_validator.infopacks.package_handler import PackageHandler

__version__ = '0.1.0'

Expand Down Expand Up @@ -98,28 +101,33 @@ def main():
_exit = _loop_exit if (_loop_exit > 0) else _exit
sys.exit(_exit)

def _validate_ip(info_pack):
ret_stat = _check_path(info_pack)
struct_details = STRUCT.validate_package_structure(info_pack)
pprint('Path {}, struct result is: {}'.format(info_pack,
struct_details.status))
for error in struct_details.errors:
pprint(error.to_json())
def _validate_ip(path: str) -> Tuple[int, Optional[StructResults]]:
ret_stat, checked_path = _check_path(path)
if ret_stat > 0:
return ret_stat, None
report = PACKAGES.PackageValidator(checked_path).validation_report
print('Path {}, struct result is: {}'.format(checked_path,
report.structure.status.value))
for message in report.structure.messages:
print(message.model_dump_json())

return ret_stat, struct_details
return ret_stat, report.structure

def _check_path(path):
def _check_path(path: str) -> Tuple[int, Optional[Path]]:
if not os.path.exists(path):
# Skip files that don't exist
pprint('Path {} does not exist'.format(path))
return 1
print(_format_check_path_message(path, 'does not exist'))
return 1, None
if os.path.isfile(path):
# Check if file is a archive format
if not STRUCT.ArchivePackageHandler.is_archive(path):
if not PackageHandler.is_archive(path):
# If not we can't process so report and iterate
pprint('Path {} is not a file we can process.'.format(path))
return 2
return 0
print(_format_check_path_message(path, 'is not an archive file or directory'))
return 2, None
return 0, Path(path)

def _format_check_path_message(path: Path, message: str) -> str:
return 'Processing terminated, path: {} {}.'.format(path, message)

# def _test_case_schema_checks():
if __name__ == '__main__':
Expand Down
89 changes: 22 additions & 67 deletions eark_validator/infopacks/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,86 +31,41 @@

from eark_validator.ipxml.schema import Namespaces
from eark_validator.const import NO_PATH, NOT_DIR, NOT_FILE
@unique
class HashAlgorithms(Enum):
"""Enum covering information package validation statuses."""
MD5 = 'MD5'
SHA1 = 'SHA-1'
SHA256 = 'SHA-256'
SHA384 = 'SHA-384'
SHA512 = 'SHA-512'
from eark_validator.model import Checksum, ChecksumAlg

class Checksummer:
def __init__(self, algorithm: ChecksumAlg):
self._algorithm = algorithm

@property
def algorithm(self) -> ChecksumAlg:
"""Get the algorithm."""
return self._algorithm

def hash_file(self, path: str) -> 'Checksum':
if (not os.path.exists(path)):
raise FileNotFoundError(NO_PATH.format(path))
if (not os.path.isfile(path)):
raise ValueError(NOT_FILE.format(path))
implemenation = self.get_implementation(self)
implemenation = ChecksumAlg.get_implementation(self._algorithm)
with open(path, 'rb') as file:
for chunk in iter(lambda: file.read(4096), b''):
implemenation.update(chunk)
return Checksum(self, implemenation.hexdigest())

@classmethod
def from_string(cls, value: str) -> 'HashAlgorithms':
search_value = value.upper() if hasattr(value, 'upper') else value
for algorithm in cls:
if (algorithm.value == search_value) or (algorithm.name == search_value) or (algorithm == value):
return algorithm
return None

@classmethod
def get_implementation(cls, algorithm: 'HashAlgorithms'):
if algorithm not in cls:
algorithm = cls.from_string(algorithm)
if algorithm is None:
raise ValueError('Algorithm {} not supported.'.format(algorithm))
algorithms = {
cls.MD5: hashlib.md5(),
cls.SHA1: hashlib.sha1(),
cls.SHA256: hashlib.sha256(),
cls.SHA384: hashlib.sha384(),
cls.SHA512: hashlib.sha512()
}
return algorithms.get(algorithm)


class Checksum:
def __init__(self, algorithm: HashAlgorithms, value: str):
self._algorithm = algorithm
self._value = value.lower()

@property
def algorithm(self) -> HashAlgorithms:
"""Get the algorithm."""
return self._algorithm

@property
def value(self) -> str:
"""Get the value."""
return self._value

def is_value(self, value: 'Checksum') -> bool:
"""Check if the checksum value is equal to the given value."""
if isinstance(value, Checksum):
return (self._value == value.value) and (self._algorithm == value.algorithm)
return self._value == value.lower()
return Checksum(algorithm=self._algorithm, value=implemenation.hexdigest().upper())

@classmethod
def from_mets_element(cls, element: ET.Element) -> 'Checksum':
"""Create a Checksum from an etree element."""
# Get the child flocat element and grab the href attribute.
algorithm = HashAlgorithms.from_string(element.attrib['CHECKSUMTYPE'])
algorithm = ChecksumAlg.from_string(element.attrib['CHECKSUMTYPE'])
value = element.attrib['CHECKSUM']
return cls(algorithm, value)
return Checksum(algorithm=algorithm, value=value.upper())

@classmethod
def from_file(cls, path: str, algorithm: 'Checksum') -> 'Checksum':
def from_file(cls, path: str, algorithm: 'ChecksumAlg') -> 'Checksum':
"""Create a Checksum from an etree element."""
# Get the child flocat element and grab the href attribute.
algorithm = HashAlgorithms.from_string(algorithm)
return algorithm.hash_file(path)

return Checksummer(algorithm).hash_file(path)

class FileItem:
def __init__(self, path: str, size: int, checksum: Checksum, mime: str):
Expand Down Expand Up @@ -166,19 +121,19 @@ def from_element(cls, element: ET.Element) -> 'FileItem':
raise ValueError('Element {} is not a METS:file or METS:mdRef element.'.format(element.tag))
size = int(element.attrib['SIZE'])
mime = element.attrib['MIMETYPE']
checksum = Checksum.from_mets_element(element)
checksum = Checksummer.from_mets_element(element)
return cls(path, size, checksum, mime)

@classmethod
def from_file_path(cls, path: str, mime:str=None, checksum_algorithm:HashAlgorithms=None) -> 'FileItem':
def from_file_path(cls, path: str, mime:str=None, checksum_algorithm:ChecksumAlg=None) -> 'FileItem':
"""Create a FileItem from a file path."""
if (not os.path.exists(path)):
raise FileNotFoundError(NO_PATH.format(path))
if (not os.path.isfile(path)):
raise ValueError('Path {} is not a file.'.format(path))
size = os.path.getsize(path)
mimetype = mime or 'application/octet-stream'
checksum = Checksum.from_file(path, checksum_algorithm) if checksum_algorithm else None
checksum = Checksummer.from_file(path, checksum_algorithm) if checksum_algorithm else None
return cls(path, size, checksum, mimetype)

class Manifest:
Expand Down Expand Up @@ -228,8 +183,8 @@ def check_integrity(self) -> tuple[bool, list[str]]:
if (item.size != os.path.getsize(abs_path)):
issues.append('File {} manifest size {}, filesystem size {}.'.format(item.path, item.size, os.path.getsize(abs_path)))
is_valid = False
calced_checksum = item.checksum.algorithm.hash_file(abs_path)
if (not item.checksum.is_value(calced_checksum)):
calced_checksum = Checksummer.from_file(abs_path, item.checksum.algorithm)
if not item.checksum == calced_checksum:
issues.append('File {} manifest checksum {}, calculated checksum {}.'.format(item.path, item.checksum, calced_checksum))
is_valid = False
return is_valid, issues
Expand All @@ -239,7 +194,7 @@ def _relative_path(root_path: str, path: str) -> str:
return path if not os.path.isabs(path) else os.path.relpath(path, root_path)

@classmethod
def from_directory(cls, root_path: str, checksum_algorithm: HashAlgorithms=None) -> 'Manifest':
def from_directory(cls, root_path: str, checksum_algorithm: ChecksumAlg=None) -> 'Manifest':
if (not os.path.exists(root_path)):
raise FileNotFoundError(NO_PATH.format(root_path))
if (not os.path.isdir(root_path)):
Expand Down
95 changes: 95 additions & 0 deletions eark_validator/infopacks/package_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# E-ARK Validation
# Copyright (C) 2019
# All rights reserved.
#
# Licensed to the E-ARK project under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The E-ARK project licenses
# this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
Factory methods for the package classes.
"""
import os
from pathlib import Path
import tarfile
import tempfile
import zipfile
from eark_validator.infopacks.manifest import Checksummer
SUB_MESS_NOT_EXIST = 'Path {} does not exist'
SUB_MESS_NOT_ARCH = 'Parameter "to_unpack": {} does not reference a file of known archive format (zip or tar).'

class PackageError(Exception):
"""Exception used to mark validation error when unpacking archive."""

class PackageHandler():
"""Class to handle archive / compressed information packages."""
def __init__(self, unpack_root: Path=Path(tempfile.gettempdir())):
self._unpack_root : Path = unpack_root

@property
def unpack_root(self) -> Path:
"""Returns the root directory for archive unpacking."""
return self._unpack_root

def prepare_package(self, to_prepare: Path, dest: Path=None) -> Path:
if not os.path.exists(to_prepare):
raise ValueError(SUB_MESS_NOT_EXIST.format(to_prepare))
if os.path.isdir(to_prepare):
return to_prepare
return self.unpack_package(to_prepare, dest)

def unpack_package(self, to_unpack: Path, dest: Path=None) -> Path:
"""Unpack an archived package to a destination (defaults to tempdir).
returns the destination folder."""
if not os.path.isfile(to_unpack) or not self.is_archive(to_unpack):
raise ValueError(SUB_MESS_NOT_ARCH.format(to_unpack))
sha1 = Checksummer('SHA-1').hash_file(to_unpack)
dest_root = dest if dest else self.unpack_root
destination = os.path.join(dest_root, sha1.value)
self._unpack(to_unpack, destination)

children = []
for path in Path(destination).iterdir():
children.append(path)
if len(children) != 1:
# Dir unpacks to more than a single folder
raise PackageError('Unpacking archive yields'
'{} children.'.format(len(children)))
if not os.path.isdir(children[0]):
raise PackageError('Unpacking archive yields'
'a single file child {}.'.format(children[0]))
return children[0].absolute()

@staticmethod
def _unpack(to_unpack: Path, destination: Path):
if zipfile.is_zipfile(to_unpack):
with zipfile.ZipFile(to_unpack) as zip_ip:
zip_ip.extractall(path=destination)
elif tarfile.is_tarfile(to_unpack):
with tarfile.open(to_unpack) as tar_ip:
tar_ip.extractall(path=destination)

@staticmethod
def is_archive(to_test: Path) -> bool:
"""Return True if the file is a recognised archive type, False otherwise."""
if os.path.isfile(to_test):
if zipfile.is_zipfile(to_test):
return True
return tarfile.is_tarfile(to_test)
return False
7 changes: 2 additions & 5 deletions eark_validator/ipxml/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
# under the License.
#
"""
E-ARK : Information package validation
Information Package modules
E-ARK : Information Package Validation
Information Package XML module
"""
from .resources import profiles as PROFILES
from .resources import schema as SCHEMA
from .resources import schematron as SCHEMATRON
2 changes: 0 additions & 2 deletions eark_validator/ipxml/namespaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
Information Package modules
"""
from enum import Enum, unique
from lxml import etree
from importlib_resources import files

@unique
class Namespaces(Enum):
Expand Down
7 changes: 2 additions & 5 deletions eark_validator/ipxml/resources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
# under the License.
#
"""
E-ARK : Information package validation
Information Package modules
E-ARK : Information Package Validation
Information Package XML vocabularies
"""
from . import profiles as PROFILES
from . import schema as SCHEMA
from . import schematron as SCHEMATRON
28 changes: 28 additions & 0 deletions eark_validator/ipxml/resources/profiles/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# E-ARK Validation
# Copyright (C) 2019
# All rights reserved.
#
# Licensed to the E-ARK project under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The E-ARK project licenses
# this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
E-ARK : Information Package validation
Information Package XML METS Profiles
"""
28 changes: 28 additions & 0 deletions eark_validator/ipxml/resources/schema/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# E-ARK Validation
# Copyright (C) 2019
# All rights reserved.
#
# Licensed to the E-ARK project under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The E-ARK project licenses
# this file to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
E-ARK : Information Package Validation
Information Package XML schema
"""
Loading

0 comments on commit da7c45e

Please sign in to comment.