Skip to content

Commit

Permalink
Add providing location for fetch
Browse files Browse the repository at this point in the history
 -extends aboutcode-org#54
 -Added filename deduction (content-disposition/URL)
 -Fetch and its helper functions now use pathlib's Path

Signed-off-by: Mateusz Perc <[email protected]>
  • Loading branch information
quepop committed Jun 23, 2021
1 parent 42110e9 commit 36c9e89
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 21 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ six==1.14.0
urllib3==1.25.8
wcwidth==0.1.8
zipp==1.2.0
kiss-headers==2.3.0
72 changes: 51 additions & 21 deletions src/fetchcode/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@

from ftplib import FTP
from mimetypes import MimeTypes
import os
import tempfile
from urllib.parse import urlparse
from kiss_headers import parse_it
from pathlib import Path

import requests
import tempfile


class Response:
Expand All @@ -41,14 +42,35 @@ def __init__(self, location, content_type, size, url):
def fetch_http(url, location):
"""
Return a `Response` object built from fetching the content at a HTTP/HTTPS based `url` URL string
saving the content in a file at `location`
Saving the content in a file at `location`
If `location` is an existing directory - try to deduce the filename
If deduction failed, save the content in a temporary file created at a `location`
"""
r = requests.get(url)
with open(location, 'wb') as f:

if Path.is_dir(location):
content_disposition = parse_it(r.headers).get("content-disposition") or {}
filename_priority = [
content_disposition.get("filename*"),
content_disposition.get("filename"),
Path(urlparse(url).path).name,
]
filename_found = False
for filename in filename_priority:
if filename is not None and len(filename):
filename_found = True
location = location / filename
break
if not filename_found:
location = Path(
tempfile.NamedTemporaryFile(dir=location, delete=False).name
)

with open(location, "wb") as f:
f.write(r.content)

content_type = r.headers.get('content-type')
size = r.headers.get('content-length')
content_type = r.headers.get("content-type")
size = r.headers.get("content-length")
size = int(size) if size else None

resp = Response(location=location, content_type=content_type, size=size, url=url)
Expand All @@ -59,49 +81,57 @@ def fetch_http(url, location):
def fetch_ftp(url, location):
"""
Return a `Response` object built from fetching the content at a FTP based `url` URL string
saving the content in a file at `location`
Saving the content in a file at `location`
If `location` is an existing directory - deduce the filename from the URL
"""
url_parts = urlparse(url)

netloc = url_parts.netloc
path = url_parts.path
dir, file = os.path.split(path)
path = Path(url_parts.path)
directory = path.parent
filename = path.name

if Path.is_dir(location):
location /= filename

ftp = FTP(netloc)
ftp.login()

size = ftp.size(path)
size = ftp.size(str(path))
mime = MimeTypes()
mime_type = mime.guess_type(file)
mime_type = mime.guess_type(filename)
if mime_type:
content_type = mime_type[0]
else:
content_type = None

ftp.cwd(dir)
file = 'RETR {}'.format(file)
with open(location, 'wb') as f:
ftp.retrbinary(file, f.write)
ftp.cwd(str(directory))
filename = "RETR {}".format(filename)
with open(location, "wb") as f:
ftp.retrbinary(filename, f.write)
ftp.close()

resp = Response(location=location, content_type=content_type, size=size, url=url)
return resp


def fetch(url):
def fetch(url, location=None):
"""
Return a `Response` object built from fetching the content at the `url` URL string and store content at a temporary file.
Return a `Response` object built from fetching the content at the `url` URL string and store content at a provided `location`
If `location` is None, save the content in a newly created temporary file
If `location` is an existing directory - try to deduce the filename
"""

temp = tempfile.NamedTemporaryFile(delete=False)
location = temp.name
if location is None:
temp = tempfile.NamedTemporaryFile(delete=False)
location = temp.name

url_parts = urlparse(url)
scheme = url_parts.scheme

fetchers = {'ftp': fetch_ftp, 'http': fetch_http, 'https': fetch_http}
fetchers = {"ftp": fetch_ftp, "http": fetch_http, "https": fetch_http}

if scheme in fetchers:
return fetchers.get(scheme)(url, location)

raise Exception('Not a supported/known scheme.')
raise Exception("Not a supported/known scheme.")

0 comments on commit 36c9e89

Please sign in to comment.