From 640e8744132dbf002c35ddc76c7e757857ebd743 Mon Sep 17 00:00:00 2001 From: Mateusz Perc Date: Fri, 11 Jun 2021 03:30:09 +0200 Subject: [PATCH 1/2] Add providing location for fetch -extends #54 -Added filename deduction (content-disposition/URL) -Fetch and its helper functions now use pathlib's Path Signed-off-by: Mateusz Perc --- requirements.txt | 1 + src/fetchcode/__init__.py | 73 ++++++++++++++++++++++++++++----------- 2 files changed, 53 insertions(+), 21 deletions(-) diff --git a/requirements.txt b/requirements.txt index d3117892..91431565 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,3 +16,4 @@ six==1.14.0 urllib3==1.25.8 wcwidth==0.1.8 zipp==1.2.0 +kiss-headers==2.3.0 diff --git a/src/fetchcode/__init__.py b/src/fetchcode/__init__.py index c573991f..acb72de6 100644 --- a/src/fetchcode/__init__.py +++ b/src/fetchcode/__init__.py @@ -16,11 +16,13 @@ from ftplib import FTP from mimetypes import MimeTypes -import os -import tempfile +from pathlib import Path +from pathlib import PurePosixPath from urllib.parse import urlparse +from kiss_headers import parse_it import requests +import tempfile class Response: @@ -41,14 +43,35 @@ def __init__(self, location, content_type, size, url): def fetch_http(url, location): """ Return a `Response` object built from fetching the content at a HTTP/HTTPS based `url` URL string - saving the content in a file at `location` + Saving the content in a file at `location` + If `location` is an existing directory - try to deduce the filename + If deduction failed, save the content in a temporary file created at a `location` """ r = requests.get(url) - with open(location, 'wb') as f: + + if Path.is_dir(location): + content_disposition = parse_it(r.headers).get("content-disposition") or {} + filename_priority = [ + content_disposition.get("filename*"), + content_disposition.get("filename"), + PurePosixPath(urlparse(url).path).name, + ] + filename_found = False + for filename in filename_priority: + if filename is not None and len(filename): + filename_found = True + location /= filename + break + if not filename_found: + location = Path( + tempfile.NamedTemporaryFile(dir=location, delete=False).name + ) + + with open(location, "wb") as f: f.write(r.content) - content_type = r.headers.get('content-type') - size = r.headers.get('content-length') + content_type = r.headers.get("content-type") + size = r.headers.get("content-length") size = int(size) if size else None resp = Response(location=location, content_type=content_type, size=size, url=url) @@ -59,49 +82,57 @@ def fetch_http(url, location): def fetch_ftp(url, location): """ Return a `Response` object built from fetching the content at a FTP based `url` URL string - saving the content in a file at `location` + Saving the content in a file at `location` + If `location` is an existing directory - deduce the filename from the URL """ url_parts = urlparse(url) netloc = url_parts.netloc - path = url_parts.path - dir, file = os.path.split(path) + path = PurePosixPath(url_parts.path) + directory = path.parent + filename = path.name + + if Path.is_dir(location): + location /= filename ftp = FTP(netloc) ftp.login() - size = ftp.size(path) + size = ftp.size(str(path)) mime = MimeTypes() - mime_type = mime.guess_type(file) + mime_type = mime.guess_type(filename) if mime_type: content_type = mime_type[0] else: content_type = None - ftp.cwd(dir) - file = 'RETR {}'.format(file) - with open(location, 'wb') as f: - ftp.retrbinary(file, f.write) + ftp.cwd(str(directory)) + filename = "RETR {}".format(filename) + with open(location, "wb") as f: + ftp.retrbinary(filename, f.write) ftp.close() resp = Response(location=location, content_type=content_type, size=size, url=url) return resp -def fetch(url): +def fetch(url, location=None): """ - Return a `Response` object built from fetching the content at the `url` URL string and store content at a temporary file. + Return a `Response` object built from fetching the content at the `url` URL string and store content at a provided `location` + If `location` is None, save the content in a newly created temporary file + If `location` is an existing directory - try to deduce the filename """ - temp = tempfile.NamedTemporaryFile(delete=False) - location = temp.name + if location is None: + temp = tempfile.NamedTemporaryFile(delete=False) + location = Path(temp.name) url_parts = urlparse(url) scheme = url_parts.scheme - fetchers = {'ftp': fetch_ftp, 'http': fetch_http, 'https': fetch_http} + fetchers = {"ftp": fetch_ftp, "http": fetch_http, "https": fetch_http} if scheme in fetchers: return fetchers.get(scheme)(url, location) - raise Exception('Not a supported/known scheme.') + raise Exception("Not a supported/known scheme.") From ac2461bdf7a249d8815987b4d421dbc615c043b9 Mon Sep 17 00:00:00 2001 From: Mateusz Perc Date: Thu, 24 Jun 2021 02:36:24 +0200 Subject: [PATCH 2/2] Add tests for location providing -Testing filename deduction for http/https/ftp schemes Signed-off-by: Mateusz Perc --- tests/test_fetch.py | 87 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 69 insertions(+), 18 deletions(-) diff --git a/tests/test_fetch.py b/tests/test_fetch.py index 09b0bf4a..f001751c 100644 --- a/tests/test_fetch.py +++ b/tests/test_fetch.py @@ -14,52 +14,103 @@ # CONDITIONS OF ANY KIND, either express or implied. See the License for the # specific language governing permissions and limitations under the License. +from fetchcode import fetch +from pathlib import Path from unittest import mock import pytest -from fetchcode import fetch +FILENAMES = ["a.ext", "b.ext", "c.ext"] +HTTP_URL = "http://example.com/" +FTP_URL = "ftp://example.com/" + + +@mock.patch("fetchcode.requests.get") +def test_fetch_http_with_filename_provided(mock_get): + with mock.patch("fetchcode.open", mock.mock_open()) as mocked_file: + location = Path.home() / FILENAMES[0] + response = fetch(HTTP_URL, location) + assert response is not None + assert response.location == location + + +@pytest.mark.parametrize( + "parameters, expected_filename", + [ + pytest.param(f'filename*="{FILENAMES[0]}"; filename=""', FILENAMES[0]), + pytest.param(f'filename*=""; filename="{FILENAMES[1]}"', FILENAMES[1]), + pytest.param(f'filename*=""; filename=""', FILENAMES[2]), + ], +) +@mock.patch("fetchcode.requests.get") +def test_fetch_http_with_filename_deduction(mock_get, parameters, expected_filename): + mock_get.return_value.headers = {"content-disposition": f"attachment; {parameters}"} + with mock.patch("fetchcode.open", mock.mock_open()) as mocked_file: + location = Path.home() + response = fetch(HTTP_URL + FILENAMES[2], location) + assert response is not None + assert response.location == (location / expected_filename) -@mock.patch('fetchcode.requests.get') + +@mock.patch("fetchcode.tempfile.NamedTemporaryFile") +@mock.patch("fetchcode.requests.get") +def test_fetch_http_filename_deduction_failed(mock_get, mock_tmp): + location = Path.home() + mock_get.return_value.headers = {} + mock_tmp.return_value.name = location / FILENAMES[0] + with mock.patch("fetchcode.open", mock.mock_open()) as mocked_file: + response = fetch(HTTP_URL, location) + assert response is not None + assert response.location == (location / FILENAMES[0]) + + +@mock.patch("fetchcode.requests.get") def test_fetch_http_with_tempfile(mock_get): mock_get.return_value.headers = { - 'content-type': 'image/png', - 'content-length': '1000999', + "content-type": "image/png", + "content-length": "1000999", } - - with mock.patch('fetchcode.open', mock.mock_open()) as mocked_file: - url = 'https://raw.githubusercontent.com/TG1999/converge/master/assets/Group%2022.png' + with mock.patch("fetchcode.open", mock.mock_open()) as mocked_file: + url = "https://raw.githubusercontent.com/TG1999/converge/master/assets/Group%2022.png" response = fetch(url=url) assert response is not None assert 1000999 == response.size assert url == response.url - assert 'image/png' == response.content_type + assert "image/png" == response.content_type -@mock.patch('fetchcode.FTP') +@mock.patch("fetchcode.FTP") def test_fetch_with_wrong_url(mock_get): with pytest.raises(Exception) as e_info: - url = 'ftp://speedtest/1KB.zip' + url = "ftp://speedtest/1KB.zip" response = fetch(url=url) - assert 'Not a valid URL' == e_info + assert "Not a valid URL" == e_info -@mock.patch('fetchcode.FTP', autospec=True) +@mock.patch("fetchcode.FTP", autospec=True) def test_fetch_ftp_with_tempfile(mock_ftp_constructor): mock_ftp = mock_ftp_constructor.return_value mock_ftp_constructor.return_value.size.return_value = 1024 - with mock.patch('fetchcode.open', mock.mock_open()) as mocked_file: - response = fetch('ftp://speedtest.tele2.net/1KB.zip') + with mock.patch("fetchcode.open", mock.mock_open()) as mocked_file: + response = fetch("ftp://speedtest.tele2.net/1KB.zip") assert 1024 == response.size - mock_ftp_constructor.assert_called_with('speedtest.tele2.net') + mock_ftp_constructor.assert_called_with("speedtest.tele2.net") assert mock_ftp.login.called == True - mock_ftp.cwd.assert_called_with('/') + mock_ftp.cwd.assert_called_with("/") assert mock_ftp.retrbinary.called +@mock.patch("fetchcode.FTP") +def test_fetch_ftp_with_filename_deduction(mock_ftp): + with mock.patch("fetchcode.open", mock.mock_open()) as mocked_file: + location = Path.home() + response = fetch(FTP_URL + FILENAMES[0], location) + assert response.location == (location / FILENAMES[0]) + + def test_fetch_with_scheme_not_present(): with pytest.raises(Exception) as e_info: - url = 'abc://speedtest/1KB.zip' + url = "abc://speedtest/1KB.zip" response = fetch(url=url) - assert 'Not a supported/known scheme.' == e_info + assert "Not a supported/known scheme." == e_info