diff --git a/beetsplug/_typing.py b/beetsplug/_typing.py new file mode 100644 index 0000000000..1aa288cbcb --- /dev/null +++ b/beetsplug/_typing.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +from typing import Any + +from typing_extensions import NotRequired, TypeAlias, TypedDict + +JSONDict: TypeAlias = "dict[str, Any]" + + +class LRCLibAPI: + class Item(TypedDict): + """Lyrics data item returned by the LRCLib API.""" + + id: int + name: str + trackName: str + artistName: str + albumName: str + duration: float | None + instrumental: bool + plainLyrics: str + syncedLyrics: str | None + + +class GeniusAPI: + """Genius API data types. + + This documents *only* the fields that are used in the plugin. + :attr:`SearchResult` is an exception, since I thought some of the other + fields might be useful in the future. + """ + + class DateComponents(TypedDict): + year: int + month: int + day: int + + class Artist(TypedDict): + api_path: str + header_image_url: str + id: int + image_url: str + is_meme_verified: bool + is_verified: bool + name: str + url: str + + class Stats(TypedDict): + unreviewed_annotations: int + hot: bool + + class SearchResult(TypedDict): + annotation_count: int + api_path: str + artist_names: str + full_title: str + header_image_thumbnail_url: str + header_image_url: str + id: int + lyrics_owner_id: int + lyrics_state: str + path: str + primary_artist_names: str + pyongs_count: int | None + relationships_index_url: str + release_date_components: GeniusAPI.DateComponents + release_date_for_display: str + release_date_with_abbreviated_month_for_display: str + song_art_image_thumbnail_url: str + song_art_image_url: str + stats: GeniusAPI.Stats + title: str + title_with_featured: str + url: str + featured_artists: list[GeniusAPI.Artist] + primary_artist: GeniusAPI.Artist + primary_artists: list[GeniusAPI.Artist] + + class SearchHit(TypedDict): + result: GeniusAPI.SearchResult + + class SearchResponse(TypedDict): + hits: list[GeniusAPI.SearchHit] + + class Search(TypedDict): + response: GeniusAPI.SearchResponse + + +class GoogleCustomSearchAPI: + class Response(TypedDict): + """Search response from the Google Custom Search API. + + If the search returns no results, the :attr:`items` field is not found. + """ + + items: NotRequired[list[GoogleCustomSearchAPI.Item]] + + class Item(TypedDict): + """A Google Custom Search API result item. + + :attr:`title` field is shown to the user in the search interface, thus + it gets truncated with an ellipsis for longer queries. For most + results, the full title is available as ``og:title`` metatag found + under the :attr:`pagemap` field. Note neither this metatag nor the + ``pagemap`` field is guaranteed to be present in the data. + """ + + title: str + link: str + pagemap: NotRequired[GoogleCustomSearchAPI.Pagemap] + + class Pagemap(TypedDict): + """Pagemap data with a single meta tags dict in a list.""" + + metatags: list[JSONDict] diff --git a/beetsplug/lyrics.py b/beetsplug/lyrics.py index e6ab217c5b..2ef1ac2f30 100644 --- a/beetsplug/lyrics.py +++ b/beetsplug/lyrics.py @@ -16,52 +16,35 @@ from __future__ import annotations -import difflib +import atexit import errno import itertools -import json +import math import os.path import re -import struct -import unicodedata -import warnings -from contextlib import suppress +from contextlib import contextmanager, suppress from dataclasses import dataclass from functools import cached_property, partial, total_ordering +from html import unescape from http import HTTPStatus -from typing import TYPE_CHECKING, ClassVar, Iterable, Iterator -from urllib.parse import quote, urlencode +from typing import TYPE_CHECKING, ClassVar, Iterable, Iterator, NamedTuple +from urllib.parse import quote, urlparse +import langdetect import requests -from typing_extensions import TypedDict +from bs4 import BeautifulSoup from unidecode import unidecode import beets from beets import plugins, ui +from beets.autotag.hooks import string_dist if TYPE_CHECKING: from beets.importer import ImportTask from beets.library import Item -try: - import bs4 - from bs4 import SoupStrainer + from ._typing import GeniusAPI, GoogleCustomSearchAPI, LRCLibAPI - HAS_BEAUTIFUL_SOUP = True -except ImportError: - HAS_BEAUTIFUL_SOUP = False - -try: - import langdetect - - HAS_LANGDETECT = True -except ImportError: - HAS_LANGDETECT = False - -DIV_RE = re.compile(r"<(/?)div>?", re.I) -COMMENT_RE = re.compile(r"", re.S) -TAG_RE = re.compile(r"<[^>]*>") -BREAK_RE = re.compile(r"\n?\s*]*)*>\s*\n?", re.I) USER_AGENT = f"beets/{beets.__version__}" INSTRUMENTAL_LYRICS = "[Instrumental]" @@ -105,37 +88,36 @@ class NotFoundError(requests.exceptions.HTTPError): pass -# Utilities. +class CaptchaError(requests.exceptions.HTTPError): + pass + + +class TimeoutSession(requests.Session): + def request(self, *args, **kwargs): + """Wrap the request method to raise an exception on HTTP errors.""" + kwargs.setdefault("timeout", 10) + r = super().request(*args, **kwargs) + if r.status_code == HTTPStatus.NOT_FOUND: + raise NotFoundError("HTTP Error: Not Found", response=r) + if 300 <= r.status_code < 400: + raise CaptchaError("Captcha is required", response=r) + r.raise_for_status() -def unichar(i): - try: - return chr(i) - except ValueError: - return struct.pack("i", i).decode("utf-32") + return r -def unescape(text): - """Resolve &#xxx; HTML entities (and some others).""" - if isinstance(text, bytes): - text = text.decode("utf-8", "ignore") - out = text.replace(" ", " ") +r_session = TimeoutSession() +r_session.headers.update({"User-Agent": USER_AGENT}) - def replchar(m): - num = m.group(1) - return unichar(int(num)) - out = re.sub("&#(\\d+);", replchar, out) - return out +@atexit.register +def close_session(): + """Close the requests session on shut down.""" + r_session.close() -def extract_text_between(html, start_marker, end_marker): - try: - _, html = html.split(start_marker, 1) - html, _ = html.split(end_marker, 1) - except ValueError: - return "" - return html +# Utilities. def search_pairs(item): @@ -204,7 +186,7 @@ def generate_alternatives(string, patterns): return itertools.product(artists, multi_titles) -def slug(text): +def slug(text: str) -> str: """Make a URL-safe, human-readable version of the given text This will do the following: @@ -214,81 +196,69 @@ def slug(text): 3. strip whitespace 4. replace other non-word characters with dashes 5. strip extra dashes - - This somewhat duplicates the :func:`Google.slugify` function but - slugify is not as generic as this one, which can be reused - elsewhere. """ return re.sub(r"\W+", "-", unidecode(text).lower().strip()).strip("-") -if HAS_BEAUTIFUL_SOUP: +class RequestHandler: + _log: beets.logging.Logger - def try_parse_html(html, **kwargs): - return bs4.BeautifulSoup(html, "html.parser", **kwargs) + def debug(self, message: str, *args) -> None: + """Log a debug message with the class name.""" + self._log.debug(f"{self.__class__.__name__}: {message}", *args) -else: + def info(self, message: str, *args) -> None: + """Log an info message with the class name.""" + self._log.info(f"{self.__class__.__name__}: {message}", *args) - def try_parse_html(html, **kwargs): - return None + def warn(self, message: str, *args) -> None: + """Log warning with the class name.""" + self._log.warning(f"{self.__class__.__name__}: {message}", *args) + def fetch_text(self, url: str, **kwargs) -> str: + """Return text / HTML data from the given URL. -class Backend: - REQUIRES_BS = False + Set the encoding to None to let requests handle it because some sites + set it incorrectly. + """ + self.debug("Fetching HTML from {}", url) + r = r_session.get(url, **kwargs) + r.encoding = None + return r.text + + def fetch_json(self, url: str, **kwargs): + """Return JSON data from the given URL.""" + self.debug("Fetching JSON from {}", url) + return r_session.get(url, **kwargs).json() + + @contextmanager + def handle_request(self) -> Iterator[None]: + try: + yield + except requests.JSONDecodeError: + self.warn("Could not decode response JSON data") + except requests.RequestException as exc: + self.warn("Request error: {}", exc) + + +class BackendClass(type): + @property + def name(cls) -> str: + """Return lowercase name of the backend class.""" + return cls.__name__.lower() + +class Backend(RequestHandler, metaclass=BackendClass): def __init__(self, config, log): self._log = log self.config = config - def fetch_url(self, url, **kwargs): - """Retrieve the content at a given URL, or return None if the source - is unreachable. - """ - try: - # Disable the InsecureRequestWarning that comes from using - # `verify=false`. - # https://github.com/kennethreitz/requests/issues/2214 - # We're not overly worried about the NSA MITMing our lyrics scraper - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - r = requests.get( - url, - verify=False, - headers={ - "User-Agent": USER_AGENT, - }, - timeout=10, - **kwargs, - ) - except requests.RequestException as exc: - self._log.debug("lyrics request failed: {0}", exc) - return - if r.status_code == requests.codes.ok: - return r.text - else: - self._log.debug("failed to fetch: {0} ({1})", url, r.status_code) - return None - def fetch( self, artist: str, title: str, album: str, length: int - ) -> str | None: + ) -> tuple[str, str] | None: raise NotImplementedError -class LRCLibItem(TypedDict): - """Lyrics data item returned by the LRCLib API.""" - - id: int - name: str - trackName: str - artistName: str - albumName: str - duration: float | None - instrumental: bool - plainLyrics: str - syncedLyrics: str | None - - @dataclass @total_ordering class LRCLyrics: @@ -296,6 +266,7 @@ class LRCLyrics: DURATION_DIFF_TOLERANCE = 0.05 target_duration: float + id: int duration: float instrumental: bool plain: str @@ -306,9 +277,12 @@ def __le__(self, other: LRCLyrics) -> bool: return self.dist < other.dist @classmethod - def make(cls, candidate: LRCLibItem, target_duration: float) -> LRCLyrics: + def make( + cls, candidate: LRCLibAPI.Item, target_duration: float + ) -> LRCLyrics: return cls( target_duration, + candidate["id"], candidate["duration"] or 0.0, candidate["instrumental"], candidate["plainLyrics"], @@ -361,24 +335,9 @@ class LRCLib(Backend): GET_URL = f"{BASE_URL}/get" SEARCH_URL = f"{BASE_URL}/search" - def warn(self, message: str, *args) -> None: - """Log a warning message with the class name.""" - self._log.warning(f"{self.__class__.__name__}: {message}", *args) - - def fetch_json(self, *args, **kwargs): - """Wrap the request method to raise an exception on HTTP errors.""" - kwargs.setdefault("timeout", 10) - kwargs.setdefault("headers", {"User-Agent": USER_AGENT}) - r = requests.get(*args, **kwargs) - if r.status_code == HTTPStatus.NOT_FOUND: - raise NotFoundError("HTTP Error: Not Found", response=r) - r.raise_for_status() - - return r.json() - def fetch_candidates( self, artist: str, title: str, album: str, length: int - ) -> Iterator[list[LRCLibItem]]: + ) -> Iterator[list[LRCLibAPI.Item]]: """Yield lyrics candidates for the given song data. Firstly, attempt to GET lyrics directly, and then search the API if @@ -403,23 +362,19 @@ def pick_best_match(cls, lyrics: Iterable[LRCLyrics]) -> LRCLyrics | None: def fetch( self, artist: str, title: str, album: str, length: int - ) -> str | None: + ) -> tuple[str, str] | None: """Fetch lyrics text for the given song data.""" fetch = partial(self.fetch_candidates, artist, title, album, length) make = partial(LRCLyrics.make, target_duration=length) pick = self.pick_best_match try: - return next( + item = next( filter(None, map(pick, (map(make, x) for x in fetch()))) - ).get_text(self.config["synced"]) + ) except StopIteration: - pass - except requests.JSONDecodeError: - self.warn("Could not decode response JSON data") - except requests.RequestException as exc: - self.warn("Request error: {}", exc) + return None - return None + return item.get_text(self.config["synced"]), f"{self.GET_URL}/{item.id}" class DirectBackend(Backend): @@ -456,22 +411,18 @@ def encode(cls, text: str) -> str: return quote(unidecode(text)) - def fetch(self, artist: str, title: str, *_) -> str | None: + def fetch(self, artist: str, title: str, *_) -> tuple[str, str] | None: url = self.build_url(artist, title) - html = self.fetch_url(url) - if not html: - return None + html = self.fetch_text(url) if "We detected that your IP is blocked" in html: - self._log.warning( - "we are blocked at MusixMatch: url %s failed" % url - ) + self.warn("Failed: Blocked IP address") return None html_parts = html.split('

", "

")) + lyrics_parts.append(re.sub(r"^[^>]+>|

.*", "", html_part)) lyrics = "\n".join(lyrics_parts) lyrics = lyrics.strip(',"').replace("\\n", "\n") # another odd case: sometimes only that string remains, for @@ -482,151 +433,165 @@ def fetch(self, artist: str, title: str, *_) -> str | None: # sometimes there are non-existent lyrics with some content if "Lyrics | Musixmatch" in lyrics: return None - return lyrics + return lyrics, url + + +class Html: + collapse_space = partial(re.compile(r"(^| ) +", re.M).sub, r"\1") + expand_br = partial(re.compile(r"\s*]*>\s*", re.I).sub, "\n") + #: two newlines between paragraphs on the same line (musica, letras.mus.br) + merge_blocks = partial(re.compile(r"(?)

]*>").sub, "\n\n") + #: a single new line between paragraphs on separate lines + #: (paroles.net, sweetslyrics.com, lacoccinelle.net) + merge_lines = partial(re.compile(r"

\s+]*>(?!___)").sub, "\n") + #: remove empty divs (lacoccinelle.net) + remove_empty_tags = partial( + re.compile(r"(<(div|span)[^>]*>\s*)").sub, "" + ) + #: remove Google Ads tags (musica.com) + remove_aside = partial(re.compile("