From af7f72632cbcbe14fea671e500210280a4e694db Mon Sep 17 00:00:00 2001 From: reivilibre Date: Fri, 15 Nov 2024 11:54:38 +0000 Subject: [PATCH] Add support for MIME block list instead of allow list (#7) I also document and test the (existing) behaviour with generic text and binary files. Closes https://github.com/element-hq/customer-success/issues/197 --- config.sample.yaml | 11 ++++ src/matrix_content_scanner/config.py | 14 ++++ src/matrix_content_scanner/scanner/scanner.py | 23 ++++++- tests/scanner/test_scanner.py | 66 ++++++++++++++++++- tests/testutils.py | 6 ++ 5 files changed, 117 insertions(+), 3 deletions(-) diff --git a/config.sample.yaml b/config.sample.yaml index f8d4f4e..954d343 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -36,9 +36,20 @@ scan: # List of allowed MIME types. If a file has a MIME type that's not in this list, its # scan is considered failed. + # Unrecognised binary files are considered to be `application/octet-stream`. + # Unrecognised text files are considered to be `text/plain`. # Optional, defaults to allowing all MIME types. allowed_mimetypes: ["image/jpeg"] + # List of blocked MIME types. + # If specified, `allowed_mimetypes` must not be specified as well. + # If specified, a file whose MIME type is on this list will produce a scan that is + # considered failed. + # Unrecognised binary files are considered to be `application/octet-stream`. + # Unrecognised text files are considered to be `text/plain`. + # Optional. + # blocked_mimetypes: ["image/jpeg"] + # Configuration of scan result caching. # # Results are stored in a cache to avoid having to download and scan a file twice. There diff --git a/src/matrix_content_scanner/config.py b/src/matrix_content_scanner/config.py index 777ef44..d396a2e 100644 --- a/src/matrix_content_scanner/config.py +++ b/src/matrix_content_scanner/config.py @@ -75,6 +75,7 @@ def _parse_size(size: Optional[Union[str, float]]) -> Optional[float]: "temp_directory": {"type": "string"}, "removal_command": {"type": "string"}, "allowed_mimetypes": {"type": "array", "items": {"type": "string"}}, + "blocked_mimetypes": {"type": "array", "items": {"type": "string"}}, }, }, "download": { @@ -131,6 +132,7 @@ class ScanConfig: temp_directory: str removal_command: str = "rm" allowed_mimetypes: Optional[List[str]] = None + blocked_mimetypes: Optional[List[str]] = None @attr.s(auto_attribs=True, frozen=True, slots=True) @@ -175,3 +177,15 @@ def __init__(self, config_dict: Dict[str, Any]): self.crypto = CryptoConfig(**(config_dict.get("crypto") or {})) self.download = DownloadConfig(**(config_dict.get("download") or {})) self.result_cache = ResultCacheConfig(**(config_dict.get("result_cache") or {})) + + # Don't allow both allowlist and blocklist for MIME types, since we do not document + # the semantics for that and it is in any case pointless. + # This could have been expressed in JSONSchema but I suspect the error message would be poor + # in that case. + if ( + self.scan.allowed_mimetypes is not None + and self.scan.blocked_mimetypes is not None + ): + raise ConfigError( + "Both `scan.allowed_mimetypes` and `scan.blocked_mimetypes` are specified, which is not allowed!" + ) diff --git a/src/matrix_content_scanner/scanner/scanner.py b/src/matrix_content_scanner/scanner/scanner.py index 39e3c51..53e9806 100644 --- a/src/matrix_content_scanner/scanner/scanner.py +++ b/src/matrix_content_scanner/scanner/scanner.py @@ -76,11 +76,18 @@ def __init__(self, mcs: "MatrixContentScanner"): self._max_size_to_cache = mcs.config.result_cache.max_file_size - # List of MIME types we should allow. If None, we don't fail files based on their + # List of MIME types we should allow. + # If None, we fall back to `_blocked_mimetypes`. + # If that's also None, we don't fail files based on their # MIME types (besides comparing it with the Content-Type header from the server # for unencrypted files). self._allowed_mimetypes = mcs.config.scan.allowed_mimetypes + # List of MIME types we should block. + # Must not be specified at the same time as `_allowed_mimetypes`. + # See the comment for `_allowed_mimetypes` for the semantics. + self._blocked_mimetypes = mcs.config.scan.blocked_mimetypes + # Cache of futures for files that are currently scanning and downloading, so that # concurrent requests don't cause a file to be downloaded and scanned twice. self._current_scans: Dict[str, Future[MediaDescription]] = {} @@ -505,3 +512,17 @@ def _check_mimetype(self, media_content: bytes) -> None: raise FileMimeTypeForbiddenError( f"File type: {detected_mimetype} not allowed" ) + + # If there's a block list for MIME types, check that the MIME type detected for + # this file is NOT in it. + if ( + self._blocked_mimetypes is not None + and detected_mimetype in self._blocked_mimetypes + ): + logger.error( + "MIME type for file is forbidden: %s", + detected_mimetype, + ) + raise FileMimeTypeForbiddenError( + f"File type: {detected_mimetype} not allowed" + ) diff --git a/tests/scanner/test_scanner.py b/tests/scanner/test_scanner.py index 8ac1bca..6ea61fd 100644 --- a/tests/scanner/test_scanner.py +++ b/tests/scanner/test_scanner.py @@ -22,8 +22,10 @@ from tests.testutils import ( ENCRYPTED_FILE_METADATA, MEDIA_PATH, + SMALL_BINARY_FILE, SMALL_PNG, SMALL_PNG_ENCRYPTED, + SMALL_TEXT_FILE, get_content_scanner, to_thumbnail_params, ) @@ -219,7 +221,7 @@ async def test_different_encryption_key(self) -> None: # But it also causes it to be downloaded again because its metadata have changed. self.assertEqual(self.downloader_mock.call_count, 2) - async def test_mimetype(self) -> None: + async def test_allowlist_mimetype(self) -> None: """Tests that, if there's an allow list for MIME types and the file's MIME type isn't in it, the file's scan fails. """ @@ -230,7 +232,7 @@ async def test_mimetype(self) -> None: with self.assertRaises(FileMimeTypeForbiddenError): await self.scanner.scan_file(MEDIA_PATH) - async def test_mimetype_encrypted(self) -> None: + async def test_allowlist_mimetype_encrypted(self) -> None: """Tests that the file's MIME type is correctly detected and compared with the allow list (if set), even if it's encrypted. """ @@ -243,6 +245,66 @@ async def test_mimetype_encrypted(self) -> None: with self.assertRaises(FileMimeTypeForbiddenError): await self.scanner.scan_file(MEDIA_PATH, ENCRYPTED_FILE_METADATA) + async def test_blocklist_mimetype(self) -> None: + """Tests that, if there's an allow list for MIME types and the file's MIME type + isn't in it, the file's scan fails. + """ + # Set a block list that blocks PNG images. + self.scanner._blocked_mimetypes = ["image/png"] + + # Check that the scan fails since the file is a PNG. + with self.assertRaises(FileMimeTypeForbiddenError): + await self.scanner.scan_file(MEDIA_PATH) + + async def test_blocklist_mimetype_encrypted(self) -> None: + """Tests that the file's MIME type is correctly detected and compared with the + allow list (if set), even if it's encrypted. + """ + self._setup_encrypted() + + # Set a block list that blocks PNG images. + self.scanner._blocked_mimetypes = ["image/png"] + + # Check that the scan fails since the file is a PNG. + with self.assertRaises(FileMimeTypeForbiddenError): + await self.scanner.scan_file(MEDIA_PATH, ENCRYPTED_FILE_METADATA) + + async def test_blocklist_mimetype_fallback_binary_file(self) -> None: + """Tests that unrecognised binary files' MIME type is assumed to be + `application/octet-stream` and that they can be blocked in this way. + """ + + self.downloader_res = MediaDescription( + # This is the *claimed* content-type by the uploader + content_type="application/vnd.io.element.generic_binary_file", + content=SMALL_BINARY_FILE, + response_headers=CIMultiDictProxy(CIMultiDict()), + ) + + # Set a block list that blocks uncategorised binary files. + self.scanner._blocked_mimetypes = ["application/octet-stream"] + + with self.assertRaises(FileMimeTypeForbiddenError): + await self.scanner.scan_file(MEDIA_PATH) + + async def test_blocklist_mimetype_fallback_text_file(self) -> None: + """Tests that unrecognised text files' MIME type is assumed to be + `text/plain` and that they can be blocked in this way. + """ + + self.downloader_res = MediaDescription( + # This is the *claimed* content-type by the uploader + content_type="application/vnd.io.element.generic_file", + content=SMALL_TEXT_FILE, + response_headers=CIMultiDictProxy(CIMultiDict()), + ) + + # Set a block list that blocks uncategorised text files. + self.scanner._blocked_mimetypes = ["text/plain"] + + with self.assertRaises(FileMimeTypeForbiddenError): + await self.scanner.scan_file(MEDIA_PATH) + async def test_dont_cache_exit_codes(self) -> None: """Tests that if the configuration specifies exit codes to ignore when running the scanning script, we don't cache them. diff --git a/tests/testutils.py b/tests/testutils.py index 6bd9649..f57e816 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -22,6 +22,12 @@ b"0a2db40000000049454e44ae426082" ) +# A small binary file without any specific format. +SMALL_BINARY_FILE = unhexlify(b"010203") + +# A small text file without any specific format. +SMALL_TEXT_FILE = b"Hello world\nThis is a tiny text file" + # A small, encrypted PNG. SMALL_PNG_ENCRYPTED = unhexlify( b"9fd28dd7a1d845a04948f13af104e39402c888f7b601bce313ad"