From 63784622e93f89a2c6d722936c8957e08f0361b2 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 1 Sep 2022 17:00:16 -0300 Subject: [PATCH 01/33] locate stream for streaming API by identifier --- lbry/extras/daemon/daemon.py | 9 +++++---- lbry/file/file_manager.py | 6 ++++-- lbry/file/source_manager.py | 1 + 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index 5d16812bff..ac8782d7f1 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -639,7 +639,7 @@ async def handle_stream_get_request(self, request: web.Request): stream = await self.jsonrpc_get(uri) if isinstance(stream, dict): raise web.HTTPServerError(text=stream['error']) - raise web.HTTPFound(f"/stream/{stream.sd_hash}") + raise web.HTTPFound(f"/stream/{stream.identifier}") async def handle_stream_range_request(self, request: web.Request): try: @@ -658,12 +658,13 @@ async def handle_stream_range_request(self, request: web.Request): log.debug("finished handling /stream range request") async def _handle_stream_range_request(self, request: web.Request): - sd_hash = request.path.split("/stream/")[1] + identifier = request.path.split("/stream/")[1] if not self.file_manager.started.is_set(): await self.file_manager.started.wait() - if sd_hash not in self.file_manager.streams: + stream = self.file_manager.get_filtered(identifier=identifier) + if not stream: return web.HTTPNotFound() - return await self.file_manager.stream_partial_content(request, sd_hash) + return await self.file_manager.stream_partial_content(request, identifier) async def _process_rpc_call(self, data): args = data.get('params', {}) diff --git a/lbry/file/file_manager.py b/lbry/file/file_manager.py index 6ab9439070..cb5d8684a0 100644 --- a/lbry/file/file_manager.py +++ b/lbry/file/file_manager.py @@ -290,8 +290,10 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag ) ) - async def stream_partial_content(self, request: Request, sd_hash: str): - return await self.source_managers['stream'].stream_partial_content(request, sd_hash) + async def stream_partial_content(self, request: Request, identifier: str): + for source_manager in self.source_managers.values(): + if source_manager.get_filtered(identifier=identifier): + return await source_manager.stream_partial_content(request, identifier) def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]: """ diff --git a/lbry/file/source_manager.py b/lbry/file/source_manager.py index 72c1709dd8..c6af4fac6d 100644 --- a/lbry/file/source_manager.py +++ b/lbry/file/source_manager.py @@ -23,6 +23,7 @@ class SourceManager: filter_fields = { + 'identifier', 'rowid', 'status', 'file_name', From 8212e73c2eae989d0bc2aa592da11070a27f66f7 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 1 Sep 2022 18:23:44 -0300 Subject: [PATCH 02/33] stream torrent from file --- lbry/torrent/torrent_manager.py | 53 +++++++++++++++++++++++++++++++-- 1 file changed, 50 insertions(+), 3 deletions(-) diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index cf9106731b..208392b76e 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -4,9 +4,11 @@ import os import typing from typing import Optional -from aiohttp.web import Request +from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable + from lbry.file.source_manager import SourceManager from lbry.file.source import ManagedDownloadSource +from lbry.schema.mime_types import guess_media_type if typing.TYPE_CHECKING: from lbry.torrent.session import TorrentSession @@ -49,6 +51,10 @@ def full_path(self) -> Optional[str]: self.download_directory = os.path.dirname(full_path) return full_path + @property + def mime_type(self) -> Optional[str]: + return guess_media_type(os.path.basename(self.full_path))[0] + async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): await self.torrent_session.add_torrent(self.identifier, self.download_directory) @@ -62,6 +68,10 @@ async def save_file(self, file_name: Optional[str] = None, download_directory: O def torrent_length(self): return self.torrent_session.get_size(self.identifier) + @property + def stream_length(self): + return os.path.getsize(self.full_path) + @property def written_bytes(self): return self.torrent_session.get_downloaded(self.identifier) @@ -81,6 +91,43 @@ def stop_tasks(self): def completed(self): return self.torrent_session.is_completed(self.identifier) + async def stream_file(self, request): + log.info("stream torrent to browser for lbry://%s#%s (btih %s...)", self.claim_name, self.claim_id, + self.identifier[:6]) + headers, size, start, end = self._prepare_range_response_headers( + request.headers.get('range', 'bytes=0-') + ) + await self.start() + response = StreamResponse( + status=206, + headers=headers + ) + await response.prepare(request) + with open(self.full_path, 'rb') as infile: + infile.seek(start) + await response.write_eof(infile.read(size)) + + def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int, int]: + if '=' in get_range: + get_range = get_range.split('=')[1] + start, end = get_range.split('-') + size = self.stream_length + + start = int(start) + end = int(end) if end else size - 1 + + if end >= size or not 0 <= start < size: + raise HTTPRequestRangeNotSatisfiable() + + final_size = end - start + 1 + headers = { + 'Accept-Ranges': 'bytes', + 'Content-Range': f'bytes {start}-{end}/{size}', + 'Content-Length': str(final_size), + 'Content-Type': self.mime_type + } + return headers, final_size, start, end + class TorrentManager(SourceManager): _sources: typing.Dict[str, ManagedDownloadSource] @@ -136,5 +183,5 @@ async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[boo # await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) # await self.storage.delete_stream(source.descriptor) - async def stream_partial_content(self, request: Request, sd_hash: str): - raise NotImplementedError + async def stream_partial_content(self, request: Request, identifier: str): + return await self._sources[identifier].stream_file(request) From 78280417864e85bef99b8ec51476ab6cd882ada6 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 1 Sep 2022 18:24:15 -0300 Subject: [PATCH 03/33] stream type independent stream_url --- lbry/extras/daemon/json_response_encoder.py | 7 +------ lbry/file/source.py | 6 +++--- lbry/stream/managed_stream.py | 4 ---- 3 files changed, 4 insertions(+), 13 deletions(-) diff --git a/lbry/extras/daemon/json_response_encoder.py b/lbry/extras/daemon/json_response_encoder.py index 75eacdb9ad..bb4aefc41b 100644 --- a/lbry/extras/daemon/json_response_encoder.py +++ b/lbry/extras/daemon/json_response_encoder.py @@ -285,7 +285,7 @@ def encode_file(self, managed_stream): else: total_bytes_lower_bound = total_bytes = managed_stream.torrent_length result = { - 'streaming_url': None, + 'streaming_url': managed_stream.stream_url, 'completed': managed_stream.completed, 'file_name': None, 'download_directory': None, @@ -326,7 +326,6 @@ def encode_file(self, managed_stream): } if is_stream: result.update({ - 'streaming_url': managed_stream.stream_url, 'stream_hash': managed_stream.stream_hash, 'stream_name': managed_stream.stream_name, 'suggested_file_name': managed_stream.suggested_file_name, @@ -340,10 +339,6 @@ def encode_file(self, managed_stream): 'reflector_progress': managed_stream.reflector_progress, 'uploading_to_reflector': managed_stream.uploading_to_reflector }) - else: - result.update({ - 'streaming_url': f'file://{managed_stream.full_path}', - }) if output_exists: result.update({ 'file_name': managed_stream.file_name, diff --git a/lbry/file/source.py b/lbry/file/source.py index ba5bb311f4..519327629e 100644 --- a/lbry/file/source.py +++ b/lbry/file/source.py @@ -99,9 +99,9 @@ def status(self) -> str: def completed(self): raise NotImplementedError() - # @property - # def stream_url(self): - # return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash} + @property + def stream_url(self): + return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.identifier}" @property def finished(self) -> bool: diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index a6be77ce48..b089628201 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -104,10 +104,6 @@ def written_bytes(self) -> int: def completed(self): return self.written_bytes >= self.descriptor.lower_bound_decrypted_length() - @property - def stream_url(self): - return f"http://{self.config.streaming_host}:{self.config.streaming_port}/stream/{self.sd_hash}" - async def update_status(self, status: str): assert status in [self.STATUS_RUNNING, self.STATUS_STOPPED, self.STATUS_FINISHED] self._status = status From 8ee5cee8c304ea573d75e69afe9950c3733833e4 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 5 Sep 2022 10:50:22 -0300 Subject: [PATCH 04/33] update flags, set sequential as a flag --- lbry/torrent/session.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 713d820392..27bd4c7832 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -14,6 +14,8 @@ DEFAULT_FLAGS = ( # fixme: somehow the logic here is inverted? libtorrent.add_torrent_params_flags_t.flag_auto_managed | libtorrent.add_torrent_params_flags_t.flag_update_subscribe + | libtorrent.add_torrent_params_flags_t.flag_sequential_download + | libtorrent.add_torrent_params_flags_t.flag_paused ) @@ -31,7 +33,6 @@ def __init__(self, loop, executor, handle): self.tasks = [] self.torrent_file: Optional[libtorrent.file_storage] = None self._base_path = None - self._handle.set_sequential_download(1) @property def largest_file(self) -> Optional[str]: From 6efd4dd19a8355e9894127de1343eadb2c1500b3 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 5 Sep 2022 10:52:42 -0300 Subject: [PATCH 05/33] fix save path, fix prio, update deprecated calls --- lbry/torrent/session.py | 31 ++++++++++++++++++++----------- lbry/torrent/torrent_manager.py | 2 +- 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 27bd4c7832..0bd058529d 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -36,10 +36,14 @@ def __init__(self, loop, executor, handle): @property def largest_file(self) -> Optional[str]: - if not self.torrent_file: + if self.torrent_file is None: return None index = self.largest_file_index - return os.path.join(self._base_path, self.torrent_file.at(index).path) + return os.path.join(self._base_path, self.torrent_file.file_path(index)) + + @property + def save_path(self) -> Optional[str]: + return self._base_path @property def largest_file_index(self): @@ -66,15 +70,18 @@ def _show_status(self): if not self.metadata_completed.is_set(): self.metadata_completed.set() log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name) - self.torrent_file = self._handle.get_torrent_info().files() + self.torrent_file = self._handle.torrent_file().files() self._base_path = status.save_path - first_piece = self.torrent_file.at(self.largest_file_index).offset + first_piece = self.torrent_file.piece_index_at_file(self.largest_file_index) if not self.started.is_set(): if self._handle.have_piece(first_piece): self.started.set() else: # prioritize it self._handle.set_piece_deadline(first_piece, 100) + prios = self._handle.piece_priorities() + prios[first_piece] = 7 + self._handle.prioritize_pieces(prios) if not status.is_seeding: log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s', status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000, @@ -177,6 +184,9 @@ def _add_torrent(self, btih: str, download_directory: Optional[str]): def full_path(self, btih): return self._handles[btih].largest_file + def save_path(self, btih): + return self._handles[btih].save_path + async def add_torrent(self, btih, download_path): await self._loop.run_in_executor( self._executor, self._add_torrent, btih, download_path @@ -239,17 +249,16 @@ async def main(): executor = None session = TorrentSession(asyncio.get_event_loop(), executor) - session2 = TorrentSession(asyncio.get_event_loop(), executor) - await session.bind('localhost', port=4040) - await session2.bind('localhost', port=4041) - btih = await session.add_fake_torrent() - session2._session.add_dht_node(('localhost', 4040)) - await session2.add_torrent(btih, "/tmp/down") + await session.bind() + await session.add_torrent(btih, os.path.expanduser("~/Downloads")) while True: - await asyncio.sleep(100) + session.full_path(btih) + await asyncio.sleep(1) await session.pause() executor.shutdown() if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)-4s %(name)s:%(lineno)d: %(message)s") + log = logging.getLogger(__name__) asyncio.run(main()) diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index 208392b76e..a884caf9ac 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -48,7 +48,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: ' @property def full_path(self) -> Optional[str]: full_path = self.torrent_session.full_path(self.identifier) - self.download_directory = os.path.dirname(full_path) + self.download_directory = self.torrent_session.save_path(self.identifier) return full_path @property From b3bff39eeabcbae990e4ad86e4e0969f5a2a2e87 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Mon, 5 Sep 2022 13:11:00 -0300 Subject: [PATCH 06/33] stream from torrent pieces, holding the response until the piece is completed --- lbry/torrent/session.py | 33 ++++++++++++++++++++++++++++++--- lbry/torrent/torrent_manager.py | 12 ++++++++---- 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 0bd058529d..2df515d94f 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -5,7 +5,7 @@ import random from hashlib import sha1 from tempfile import mkdtemp -from typing import Optional +from typing import Optional, Tuple import libtorrent @@ -31,9 +31,13 @@ def __init__(self, loop, executor, handle): self.total_wanted_done = 0 self.name = '' self.tasks = [] - self.torrent_file: Optional[libtorrent.file_storage] = None + self._torrent_info: libtorrent.torrent_info = handle.torrent_file() self._base_path = None + @property + def torrent_file(self) -> Optional[libtorrent.file_storage]: + return self._torrent_info.files() + @property def largest_file(self) -> Optional[str]: if self.torrent_file is None: @@ -58,6 +62,25 @@ def stop_tasks(self): while self.tasks: self.tasks.pop().cancel() + def byte_range_to_piece_range( + self, file_index, start_offset, end_offset) -> Tuple[libtorrent.peer_request, libtorrent.peer_request]: + start_piece = self._torrent_info.map_file(file_index, start_offset, 0) + end_piece = self._torrent_info.map_file(file_index, end_offset, 0) + return start_piece, end_piece + + async def stream_range_as_completed(self, file_index, start, end): + first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end) + start_piece_offset = final_piece.start + piece_size = self._torrent_info.piece_length() + log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d): %s", + first_piece.piece, final_piece.piece, start, end, self.name) + for piece_index in range(first_piece.piece, final_piece.piece + 1): + while not self._handle.have_piece(piece_index): + log.info("Waiting for piece %d: %s", piece_index, self.name) + await asyncio.sleep(0.2) + log.info("Streaming piece offset %d / %d for torrent %s", piece_index, final_piece.piece, self.name) + yield piece_size - start_piece_offset + def _show_status(self): # fixme: cleanup if not self._handle.is_valid(): @@ -69,8 +92,8 @@ def _show_status(self): self.name = status.name if not self.metadata_completed.is_set(): self.metadata_completed.set() + self._torrent_info = self._handle.torrent_file() log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name) - self.torrent_file = self._handle.torrent_file().files() self._base_path = status.save_path first_piece = self.torrent_file.piece_index_at_file(self.largest_file_index) if not self.started.is_set(): @@ -220,6 +243,10 @@ def get_downloaded(self, btih): def is_completed(self, btih): return self._handles[btih].finished.is_set() + def stream_largest_file(self, btih, start, end): + handle = self._handles[btih] + return handle.stream_range_as_completed(handle.largest_file_index, start, end) + def get_magnet_uri(btih): return f"magnet:?xt=urn:btih:{btih}" diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index a884caf9ac..da839850ca 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -94,7 +94,7 @@ def completed(self): async def stream_file(self, request): log.info("stream torrent to browser for lbry://%s#%s (btih %s...)", self.claim_name, self.claim_id, self.identifier[:6]) - headers, size, start, end = self._prepare_range_response_headers( + headers, start, end = self._prepare_range_response_headers( request.headers.get('range', 'bytes=0-') ) await self.start() @@ -105,9 +105,13 @@ async def stream_file(self, request): await response.prepare(request) with open(self.full_path, 'rb') as infile: infile.seek(start) - await response.write_eof(infile.read(size)) + async for read_size in self.torrent_session.stream_largest_file(self.identifier, start, end): + if start + read_size < end: + await response.write(infile.read(read_size)) + else: + await response.write_eof(infile.read(end - infile.tell())) - def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int, int]: + def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]: if '=' in get_range: get_range = get_range.split('=')[1] start, end = get_range.split('-') @@ -126,7 +130,7 @@ def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing 'Content-Length': str(final_size), 'Content-Type': self.mime_type } - return headers, final_size, start, end + return headers, start, end class TorrentManager(SourceManager): From b2f82070b03af29260e83b67bb73718dc324d9d5 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 7 Sep 2022 18:59:06 -0300 Subject: [PATCH 07/33] piece prioritization and deadlines --- lbry/torrent/session.py | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 2df515d94f..588178582c 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -74,9 +74,12 @@ async def stream_range_as_completed(self, file_index, start, end): piece_size = self._torrent_info.piece_length() log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d): %s", first_piece.piece, final_piece.piece, start, end, self.name) + self.prioritize(file_index, start, end) + await self.resume() for piece_index in range(first_piece.piece, final_piece.piece + 1): while not self._handle.have_piece(piece_index): log.info("Waiting for piece %d: %s", piece_index, self.name) + self._handle.set_piece_deadline(piece_index, 0) await asyncio.sleep(0.2) log.info("Streaming piece offset %d / %d for torrent %s", piece_index, final_piece.piece, self.name) yield piece_size - start_piece_offset @@ -94,25 +97,32 @@ def _show_status(self): self.metadata_completed.set() self._torrent_info = self._handle.torrent_file() log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name) + # prioritize first 2mb + self.prioritize(self.largest_file_index, 0, 2 * 1024 * 1024) self._base_path = status.save_path first_piece = self.torrent_file.piece_index_at_file(self.largest_file_index) if not self.started.is_set(): if self._handle.have_piece(first_piece): + log.debug("Got first piece, set started - %s", self.name) self.started.set() - else: - # prioritize it - self._handle.set_piece_deadline(first_piece, 100) - prios = self._handle.piece_priorities() - prios[first_piece] = 7 - self._handle.prioritize_pieces(prios) - if not status.is_seeding: - log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s', - status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000, - status.num_peers, status.num_seeds, status.state, status.save_path) - elif not self.finished.is_set(): + log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s', + status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000, + status.num_peers, status.num_seeds, status.state, status.save_path) + if (status.is_finished or status.is_seeding) and not self.finished.is_set(): self.finished.set() log.info("Torrent finished: %s", self.name) + def prioritize(self, file_index, start, end, cleanup=False): + first_piece, last_piece = self.byte_range_to_piece_range(file_index, start, end) + priorities = self._handle.get_piece_priorities() + priorities = [0 if cleanup else 1 for _ in priorities] + self._handle.clear_piece_deadlines() + for idx, piece_number in enumerate(range(first_piece.piece, last_piece.piece + 1)): + priorities[piece_number] = 7 - idx if 0 <= idx <= 6 else 1 + self._handle.set_piece_deadline(piece_number, idx) + log.debug("Prioritizing pieces for %s: %s", self.name, priorities) + self._handle.prioritize_pieces(priorities) + async def status_loop(self): while True: self._show_status() From df680e722542103d8d7a78567a46b93c58fc1362 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 9 Sep 2022 02:32:38 -0300 Subject: [PATCH 08/33] fix tests and off by one error --- lbry/torrent/session.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 588178582c..8d2dc6d91a 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -89,6 +89,7 @@ def _show_status(self): if not self._handle.is_valid(): return status = self._handle.status() + self._base_path = status.save_path if status.has_metadata: self.size = status.total_wanted self.total_wanted_done = status.total_wanted_done @@ -99,7 +100,6 @@ def _show_status(self): log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name) # prioritize first 2mb self.prioritize(self.largest_file_index, 0, 2 * 1024 * 1024) - self._base_path = status.save_path first_piece = self.torrent_file.piece_index_at_file(self.largest_file_index) if not self.started.is_set(): if self._handle.have_piece(first_piece): @@ -117,7 +117,7 @@ def prioritize(self, file_index, start, end, cleanup=False): priorities = self._handle.get_piece_priorities() priorities = [0 if cleanup else 1 for _ in priorities] self._handle.clear_piece_deadlines() - for idx, piece_number in enumerate(range(first_piece.piece, last_piece.piece + 1)): + for idx, piece_number in enumerate(range(first_piece.piece, last_piece.piece)): priorities[piece_number] = 7 - idx if 0 <= idx <= 6 else 1 self._handle.set_piece_deadline(piece_number, idx) log.debug("Prioritizing pieces for %s: %s", self.name, priorities) From 7410991123d4c5da69974da1b5253291a5ec3d11 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 9 Sep 2022 20:19:19 -0300 Subject: [PATCH 09/33] save resume data on stop, remove/replace deprecated calls --- lbry/torrent/session.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 8d2dc6d91a..6a5e16a3a5 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -3,7 +3,6 @@ import os import logging import random -from hashlib import sha1 from tempfile import mkdtemp from typing import Optional, Tuple @@ -59,6 +58,7 @@ def largest_file_index(self): return index def stop_tasks(self): + self._handle.save_resume_data() while self.tasks: self.tasks.pop().cancel() @@ -152,13 +152,13 @@ def __init__(self, loop, executor): async def add_fake_torrent(self): tmpdir = mkdtemp() - info, btih = _create_fake_torrent(tmpdir) + info = _create_fake_torrent(tmpdir) flags = libtorrent.add_torrent_params_flags_t.flag_seed_mode handle = self._session.add_torrent({ 'ti': info, 'save_path': tmpdir, 'flags': flags }) - self._handles[btih] = TorrentHandle(self._loop, self._executor, handle) - return btih + self._handles[str(info.info_hash())] = TorrentHandle(self._loop, self._executor, handle) + return str(info.info_hash()) async def bind(self, interface: str = '0.0.0.0', port: int = 10889): settings = { @@ -172,14 +172,12 @@ async def bind(self, interface: str = '0.0.0.0', port: int = 10889): self.tasks.append(self._loop.create_task(self.process_alerts())) def stop(self): + while self._handles: + self._handles.popitem()[1].stop_tasks() while self.tasks: self.tasks.pop().cancel() self._session.save_state() self._session.pause() - self._session.stop_dht() - self._session.stop_lsd() - self._session.stop_natpmp() - self._session.stop_upnp() self._session = None def _pop_alerts(self): @@ -271,9 +269,7 @@ def _create_fake_torrent(tmpdir): file_storage.add_file('tmp', size) t = libtorrent.create_torrent(file_storage, 0, 4 * 1024 * 1024) libtorrent.set_piece_hashes(t, tmpdir) - info = libtorrent.torrent_info(t.generate()) - btih = sha1(info.metadata()).hexdigest() - return info, btih + return libtorrent.torrent_info(t.generate()) async def main(): From dd103d0f955ccb16f8b3b7d4c2925e81fe5e3c4a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 15 Sep 2022 21:01:41 -0300 Subject: [PATCH 10/33] save file-torrent association for file list --- lbry/extras/daemon/storage.py | 23 ++++++++++++++++------- lbry/torrent/torrent_manager.py | 7 +++++++ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 81b4263dc3..81b6269823 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -211,7 +211,7 @@ def delete_torrent(transaction: sqlite3.Connection, bt_infohash: str): transaction.execute("delete from torrent where bt_infohash=?", (bt_infohash,)).fetchall() -def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typing.Optional[str], +def store_file(transaction: sqlite3.Connection, identifier_value: str, file_name: typing.Optional[str], download_directory: typing.Optional[str], data_payment_rate: float, status: str, content_fee: typing.Optional[Transaction], added_on: typing.Optional[int] = None) -> int: if not file_name and not download_directory: @@ -219,15 +219,18 @@ def store_file(transaction: sqlite3.Connection, stream_hash: str, file_name: typ else: encoded_file_name = binascii.hexlify(file_name.encode()).decode() encoded_download_dir = binascii.hexlify(download_directory.encode()).decode() + is_torrent = len(identifier_value) == 40 time_added = added_on or int(time.time()) transaction.execute( - "insert or replace into file values (?, NULL, ?, ?, ?, ?, ?, ?, ?)", - (stream_hash, encoded_file_name, encoded_download_dir, data_payment_rate, status, + f"insert or replace into file values ({'NULL, ?' if is_torrent else '?, NULL'}, ?, ?, ?, ?, ?, ?, ?)", + (identifier_value, encoded_file_name, encoded_download_dir, data_payment_rate, status, 1 if (file_name and download_directory and os.path.isfile(os.path.join(download_directory, file_name))) else 0, None if not content_fee else binascii.hexlify(content_fee.raw).decode(), time_added) ).fetchall() - return transaction.execute("select rowid from file where stream_hash=?", (stream_hash, )).fetchone()[0] + return transaction.execute( + f"select rowid from file where {'bt_infohash' if is_torrent else 'stream_hash'}=?", + (identifier_value, )).fetchone()[0] class SQLiteStorage(SQLiteMixin): @@ -872,11 +875,17 @@ async def save_content_claim(self, stream_hash, claim_outpoint): if stream_hash in self.content_claim_callbacks: await self.content_claim_callbacks[stream_hash]() + def _save_torrent(self, transaction, bt_infohash, length, name): + transaction.execute( + "insert or replace into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name) + ).fetchall() + + async def add_torrent(self, bt_infohash, length, name): + return await self.db.run(self._save_torrent, bt_infohash, length, name) + async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name): def _save_torrent(transaction): - transaction.execute( - "insert or replace into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name) - ).fetchall() + self._save_torrent(transaction, bt_infohash, length, name) transaction.execute( "insert or replace into content_claim values (NULL, ?, ?)", (bt_infohash, claim_outpoint) ).fetchall() diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index da839850ca..1e91d14109 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -3,6 +3,7 @@ import logging import os import typing +from pathlib import Path from typing import Optional from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable @@ -57,6 +58,12 @@ def mime_type(self) -> Optional[str]: async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): await self.torrent_session.add_torrent(self.identifier, self.download_directory) + self.download_directory = self.torrent_session.save_path(self.identifier) + self._file_name = Path(self.torrent_session.full_path(self.identifier)).name + await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name) + self.rowid = await self.storage.save_downloaded_file( + self.identifier, self.file_name, self.download_directory, 0.0, added_on=self._added_on + ) async def stop(self, finished: bool = False): await self.torrent_session.remove_torrent(self.identifier) From 7746ded9b6bebbccc7031a76b3b47a48e1bec708 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 16 Sep 2022 00:37:05 -0300 Subject: [PATCH 11/33] add test case for restart, fix torrent file update --- lbry/extras/daemon/storage.py | 12 +++++++++++- lbry/file/file_manager.py | 4 ++-- lbry/file/source_manager.py | 1 + lbry/torrent/torrent_manager.py | 6 ++++-- tests/integration/datanetwork/test_file_commands.py | 8 ++++++++ 5 files changed, 26 insertions(+), 5 deletions(-) diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 81b6269823..5790bccf18 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -5,6 +5,7 @@ import asyncio import binascii import time +from operator import itemgetter from typing import Optional from lbry.wallet import SQLiteMixin from lbry.conf import Config @@ -635,6 +636,15 @@ def update_db_removed(transaction: sqlite3.Connection, removed): def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]: return self.db.run(get_all_lbry_files) + async def get_all_torrent_files(self) -> typing.List[typing.Dict]: + def _get_all_torrent_files(transaction): + cursor = transaction.execute("select * from file join torrent on file.bt_infohash=torrent.bt_infohash") + return [ + {field: value for field, value in zip(list(map(itemgetter(0), cursor.description)), row)} + for row in cursor.fetchall() + ] + return await self.db.run(_get_all_torrent_files) + def change_file_status(self, stream_hash: str, new_status: str): log.debug("update file status %s -> %s", stream_hash, new_status) return self.db.execute_fetchall("update file set status=? where stream_hash=?", (new_status, stream_hash)) @@ -907,7 +917,7 @@ async def get_content_claim(self, stream_hash: str, include_supports: typing.Opt async def get_content_claim_for_torrent(self, bt_infohash): claims = await self.db.run(get_claims_from_torrent_info_hashes, [bt_infohash]) - return claims[bt_infohash].as_dict() if claims else None + return claims[bt_infohash] if claims else None # # # # # # # # # reflector functions # # # # # # # # # diff --git a/lbry/file/file_manager.py b/lbry/file/file_manager.py index cb5d8684a0..9cf666a81c 100644 --- a/lbry/file/file_manager.py +++ b/lbry/file/file_manager.py @@ -139,7 +139,7 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag existing[0].identifier, outpoint, existing[0].torrent_length, existing[0].torrent_name ) claim_info = await self.storage.get_content_claim_for_torrent(existing[0].identifier) - existing[0].set_claim(claim_info, claim) + existing[0].set_claim(claim_info.as_dict() if claim_info else None, claim) else: await self.storage.save_content_claim( existing[0].stream_hash, outpoint @@ -242,7 +242,7 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag stream.identifier, outpoint, stream.torrent_length, stream.torrent_name ) claim_info = await self.storage.get_content_claim_for_torrent(stream.identifier) - stream.set_claim(claim_info, claim) + stream.set_claim(claim_info.as_dict() if claim_info else None, claim) if save_file: await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download)) return stream diff --git a/lbry/file/source_manager.py b/lbry/file/source_manager.py index c6af4fac6d..980f582d2b 100644 --- a/lbry/file/source_manager.py +++ b/lbry/file/source_manager.py @@ -84,6 +84,7 @@ async def create(self, file_path: str, key: Optional[bytes] = None, raise NotImplementedError() async def delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): + await self.storage.delete_torrent(source.identifier) self.remove(source) if delete_file and source.output_file_exists: os.remove(source.full_path) diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index 1e91d14109..76e4bc32e8 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -161,7 +161,7 @@ async def recover_streams(self, file_infos: typing.List[typing.Dict]): async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[str], download_directory: Optional[str], status: str, claim: Optional['StoredContentClaim'], content_fee: Optional['Transaction'], - added_on: Optional[int]): + added_on: Optional[int], **kwargs): stream = TorrentSource( self.loop, self.config, self.storage, identifier=bt_infohash, file_name=file_name, download_directory=download_directory, status=status, claim=claim, rowid=rowid, @@ -171,7 +171,9 @@ async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[s self.add(stream) async def initialize_from_database(self): - pass + for file in await self.storage.get_all_torrent_files(): + claim = await self.storage.get_content_claim_for_torrent(file['bt_infohash']) + await self._load_stream(None, claim=claim, **file) async def start(self): await super().start() diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 95e92ce1e9..02b7a8c1c4 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -66,6 +66,14 @@ async def test_download_torrent(self): # claim now points to another torrent, update to it self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih) + self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) + + # restart and verify that only one updated stream was recovered + self.daemon.file_manager.stop() + await self.daemon.file_manager.start() + self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih) + self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) + self.assertIn(new_btih, self.client_session._handles) self.assertNotIn(btih, self.client_session._handles) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) From 37adc59b378a53a9cdcd2d803d087c9494c5fb8b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 23 Sep 2022 10:58:38 -0300 Subject: [PATCH 12/33] add tests for streaming, fix bugs --- lbry/torrent/session.py | 8 +++---- lbry/torrent/torrent_manager.py | 5 ++-- .../datanetwork/test_file_commands.py | 24 +++++++++++++++++++ 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 6a5e16a3a5..164f963f91 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -26,7 +26,7 @@ def __init__(self, loop, executor, handle): self.started = asyncio.Event(loop=loop) self.finished = asyncio.Event(loop=loop) self.metadata_completed = asyncio.Event(loop=loop) - self.size = 0 + self.size = handle.status().total_wanted self.total_wanted_done = 0 self.name = '' self.tasks = [] @@ -70,10 +70,10 @@ def byte_range_to_piece_range( async def stream_range_as_completed(self, file_index, start, end): first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end) - start_piece_offset = final_piece.start + start_piece_offset = first_piece.start piece_size = self._torrent_info.piece_length() - log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d): %s", - first_piece.piece, final_piece.piece, start, end, self.name) + log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d, piece size: %d): %s", + first_piece.piece, final_piece.piece, start, end, piece_size, self.name) self.prioritize(file_index, start, end) await self.resume() for piece_index in range(first_piece.piece, final_piece.piece + 1): diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index 76e4bc32e8..fc29bb7946 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -113,10 +113,11 @@ async def stream_file(self, request): with open(self.full_path, 'rb') as infile: infile.seek(start) async for read_size in self.torrent_session.stream_largest_file(self.identifier, start, end): - if start + read_size < end: + if infile.tell() + read_size < end: await response.write(infile.read(read_size)) else: - await response.write_eof(infile.read(end - infile.tell())) + await response.write_eof(infile.read(end - infile.tell() + 1)) + return response def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]: if '=' in get_range: diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 02b7a8c1c4..48838dc18a 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -4,11 +4,14 @@ import os from binascii import hexlify +import aiohttp.web + from lbry.schema import Claim from lbry.stream.background_downloader import BackgroundDownloader from lbry.stream.descriptor import StreamDescriptor from lbry.testcase import CommandTestCase from lbry.extras.daemon.components import TorrentSession, BACKGROUND_DOWNLOADER_COMPONENT +from lbry.utils import aiohttp_request from lbry.wallet import Transaction from lbry.torrent.tracker import UDPTrackerServerProtocol @@ -51,6 +54,23 @@ async def initialize_torrent(self, tx_to_update=None): self.addCleanup(task.cancel) return tx, btih + async def assert_torrent_streaming_works(self, btih): + url = f'http://{self.daemon.conf.streaming_host}:{self.daemon.conf.streaming_port}/get/torrent' + if self.daemon.streaming_runner.server is None: + await self.daemon.streaming_runner.setup() + site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host, + self.daemon.conf.streaming_port) + await site.start() + async with aiohttp_request('get', url) as req: + self.assertEqual(req.headers.get('Content-Type'), 'application/octet-stream') + content_range = req.headers.get('Content-Range') + content_length = int(req.headers.get('Content-Length')) + streamed_bytes = await req.content.read() + expected_size = self.seeder_session.get_size(btih) + self.assertEqual(expected_size, len(streamed_bytes)) + self.assertEqual(content_length, len(streamed_bytes)) + self.assertEqual(f"bytes 0-{expected_size - 1}/{expected_size}", content_range) + @skipIf(TorrentSession is None, "libtorrent not installed") async def test_download_torrent(self): tx, btih = await self.initialize_torrent() @@ -61,6 +81,10 @@ async def test_download_torrent(self): self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, btih) self.assertIn(btih, self.client_session._handles) + + # stream over streaming API (full range of the largest file) + await self.assert_torrent_streaming_works(btih) + tx, new_btih = await self.initialize_torrent(tx) self.assertNotEqual(btih, new_btih) # claim now points to another torrent, update to it From 7c7e18534ec7621ed80c6c267051e5abe5625dc9 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 23 Sep 2022 11:28:32 -0300 Subject: [PATCH 13/33] refactor add_torrent, lints --- lbry/extras/daemon/storage.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index 5790bccf18..a36f7e9698 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -639,11 +639,8 @@ def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]: async def get_all_torrent_files(self) -> typing.List[typing.Dict]: def _get_all_torrent_files(transaction): cursor = transaction.execute("select * from file join torrent on file.bt_infohash=torrent.bt_infohash") - return [ - {field: value for field, value in zip(list(map(itemgetter(0), cursor.description)), row)} - for row in cursor.fetchall() - ] - return await self.db.run(_get_all_torrent_files) + return map(lambda row: dict(zip(list(map(itemgetter(0), cursor.description)), row)), cursor.fetchall()) + return list(await self.db.run(_get_all_torrent_files)) def change_file_status(self, stream_hash: str, new_status: str): log.debug("update file status %s -> %s", stream_hash, new_status) @@ -885,21 +882,20 @@ async def save_content_claim(self, stream_hash, claim_outpoint): if stream_hash in self.content_claim_callbacks: await self.content_claim_callbacks[stream_hash]() - def _save_torrent(self, transaction, bt_infohash, length, name): - transaction.execute( - "insert or replace into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name) - ).fetchall() - async def add_torrent(self, bt_infohash, length, name): - return await self.db.run(self._save_torrent, bt_infohash, length, name) + def _save_torrent(transaction, bt_infohash, length, name): + transaction.execute( + "insert or replace into torrent values (?, NULL, ?, ?)", (bt_infohash, length, name) + ).fetchall() + return await self.db.run(_save_torrent, bt_infohash, length, name) async def save_torrent_content_claim(self, bt_infohash, claim_outpoint, length, name): - def _save_torrent(transaction): - self._save_torrent(transaction, bt_infohash, length, name) + def _save_torrent_claim(transaction): transaction.execute( "insert or replace into content_claim values (NULL, ?, ?)", (bt_infohash, claim_outpoint) ).fetchall() - await self.db.run(_save_torrent) + await self.add_torrent(bt_infohash, length, name) + await self.db.run(_save_torrent_claim) # update corresponding ManagedEncryptedFileDownloader object if bt_infohash in self.content_claim_callbacks: await self.content_claim_callbacks[bt_infohash]() From f650e8f07e3ac9ded7902eba2b9251e47418bd15 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 28 Sep 2022 21:51:39 -0300 Subject: [PATCH 14/33] test and bugfixes for streaming multifile in a subfolder case --- lbry/torrent/session.py | 27 ++++++++++++------- .../datanetwork/test_file_commands.py | 4 +-- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index 164f963f91..a145a31320 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -42,16 +42,20 @@ def largest_file(self) -> Optional[str]: if self.torrent_file is None: return None index = self.largest_file_index - return os.path.join(self._base_path, self.torrent_file.file_path(index)) + return os.path.join(self.save_path, self.torrent_file.file_path(index)) @property def save_path(self) -> Optional[str]: + if not self._base_path: + self._base_path = self._handle.status().save_path return self._base_path @property def largest_file_index(self): largest_size, index = 0, 0 for file_num in range(self.torrent_file.num_files()): + if '.pad' in self.torrent_file.file_path(file_num): + continue # ignore padding files if self.torrent_file.file_size(file_num) > largest_size: largest_size = self.torrent_file.file_size(file_num) index = file_num @@ -150,9 +154,9 @@ def __init__(self, loop, executor): self.tasks = [] self.wait_start = True - async def add_fake_torrent(self): + async def add_fake_torrent(self, file_count=1): tmpdir = mkdtemp() - info = _create_fake_torrent(tmpdir) + info = _create_fake_torrent(tmpdir, file_count=file_count) flags = libtorrent.add_torrent_params_flags_t.flag_seed_mode handle = self._session.add_torrent({ 'ti': info, 'save_path': tmpdir, 'flags': flags @@ -260,14 +264,17 @@ def get_magnet_uri(btih): return f"magnet:?xt=urn:btih:{btih}" -def _create_fake_torrent(tmpdir): - # beware, that's just for testing - path = os.path.join(tmpdir, 'tmp') - with open(path, 'wb') as myfile: - size = myfile.write(bytes([random.randint(0, 255) for _ in range(40)]) * 1024) +def _create_fake_torrent(tmpdir, file_count=1): + # layout: subdir/tmp{0..file_count-1} 40k files. v1+v2. automatic piece size. file_storage = libtorrent.file_storage() - file_storage.add_file('tmp', size) - t = libtorrent.create_torrent(file_storage, 0, 4 * 1024 * 1024) + subfolder = os.path.join(tmpdir, "subdir") + os.mkdir(subfolder) + for file_number in range(file_count): + file_name = f"tmp{file_number}" + with open(os.path.join(subfolder, file_name), 'wb') as myfile: + size = myfile.write(bytes([random.randint(0, 255) for _ in range(40)]) * 1024) + file_storage.add_file(os.path.join("subdir", file_name), size) + t = libtorrent.create_torrent(file_storage, 0, 0) libtorrent.set_piece_hashes(t, tmpdir) return libtorrent.torrent_info(t.generate()) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 48838dc18a..d3af87fc07 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -32,7 +32,7 @@ async def initialize_torrent(self, tx_to_update=None): self.seeder_session = TorrentSession(self.loop, None) self.addCleanup(self.seeder_session.stop) await self.seeder_session.bind('127.0.0.1', port=4040) - btih = await self.seeder_session.add_fake_torrent() + btih = await self.seeder_session.add_fake_torrent(file_count=3) address = await self.account.receiving.get_or_create_usable_address() if not tx_to_update: claim = Claim() @@ -66,7 +66,7 @@ async def assert_torrent_streaming_works(self, btih): content_range = req.headers.get('Content-Range') content_length = int(req.headers.get('Content-Length')) streamed_bytes = await req.content.read() - expected_size = self.seeder_session.get_size(btih) + expected_size = os.path.getsize(self.seeder_session.full_path(btih)) self.assertEqual(expected_size, len(streamed_bytes)) self.assertEqual(content_length, len(streamed_bytes)) self.assertEqual(f"bytes 0-{expected_size - 1}/{expected_size}", content_range) From e862c99f6c5588a02279330d27e6dae0ee543461 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 29 Sep 2022 17:29:26 -0300 Subject: [PATCH 15/33] generate 3 files, check that streamed is the largest, add method to list files --- lbry/torrent/session.py | 20 ++++++++++++++----- .../datanetwork/test_file_commands.py | 2 ++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index a145a31320..a8308d4499 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -4,7 +4,7 @@ import logging import random from tempfile import mkdtemp -from typing import Optional, Tuple +from typing import Optional, Tuple, Dict import libtorrent @@ -154,7 +154,7 @@ def __init__(self, loop, executor): self.tasks = [] self.wait_start = True - async def add_fake_torrent(self, file_count=1): + async def add_fake_torrent(self, file_count=3): tmpdir = mkdtemp() info = _create_fake_torrent(tmpdir, file_count=file_count) flags = libtorrent.add_torrent_params_flags_t.flag_seed_mode @@ -259,20 +259,30 @@ def stream_largest_file(self, btih, start, end): handle = self._handles[btih] return handle.stream_range_as_completed(handle.largest_file_index, start, end) + def get_files(self, btih) -> Dict: + handle = self._handles[btih] + return { + handle.torrent_file.file_path(file_num): handle.torrent_file.file_size(file_num) + for file_num in range(handle.torrent_file.num_files()) + if '.pad' not in handle.torrent_file.file_path(file_num) + } + def get_magnet_uri(btih): return f"magnet:?xt=urn:btih:{btih}" -def _create_fake_torrent(tmpdir, file_count=1): - # layout: subdir/tmp{0..file_count-1} 40k files. v1+v2. automatic piece size. +def _create_fake_torrent(tmpdir, file_count=3, largest_index=1): + # layout: subdir/tmp{0..file_count-1} files. v1+v2. automatic piece size. + # largest_index: which file index {0 ... file_count} will be the largest file file_storage = libtorrent.file_storage() subfolder = os.path.join(tmpdir, "subdir") os.mkdir(subfolder) for file_number in range(file_count): file_name = f"tmp{file_number}" with open(os.path.join(subfolder, file_name), 'wb') as myfile: - size = myfile.write(bytes([random.randint(0, 255) for _ in range(40)]) * 1024) + size = myfile.write( + bytes([random.randint(0, 255) for _ in range(10 - abs(file_number - largest_index))]) * 1024) file_storage.add_file(os.path.join("subdir", file_name), size) t = libtorrent.create_torrent(file_storage, 0, 0) libtorrent.set_piece_hashes(t, tmpdir) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index d3af87fc07..fdcdff3559 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -71,6 +71,8 @@ async def assert_torrent_streaming_works(self, btih): self.assertEqual(content_length, len(streamed_bytes)) self.assertEqual(f"bytes 0-{expected_size - 1}/{expected_size}", content_range) + self.assertEqual(len(streamed_bytes), max(self.seeder_session.get_files(btih).values())) + @skipIf(TorrentSession is None, "libtorrent not installed") async def test_download_torrent(self): tx, btih = await self.initialize_torrent() From c8f25027fcb02bab4f6ed9d7b9f073a611ab8c5a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 29 Sep 2022 22:12:24 -0300 Subject: [PATCH 16/33] start the stream after adding --- lbry/torrent/torrent_manager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index fc29bb7946..35e59f5a12 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -170,6 +170,7 @@ async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[s torrent_session=self.torrent_session ) self.add(stream) + await stream.start() async def initialize_from_database(self): for file in await self.storage.get_all_torrent_files(): From af0ad417dfcba2fca98bdf9e37451c0ed1c4361d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 11 Oct 2022 23:27:56 -0300 Subject: [PATCH 17/33] generalize DownloadSDTimeout to DownloadMetadata timeout + fix usages --- lbry/error/README.md | 4 ++-- lbry/error/__init__.py | 6 +++--- lbry/extras/daemon/daemon.py | 4 ++-- lbry/file/file_manager.py | 6 +++--- lbry/stream/downloader.py | 4 ++-- lbry/stream/managed_stream.py | 4 ++-- lbry/torrent/torrent_manager.py | 8 +++++++- tests/unit/stream/test_stream_manager.py | 10 +++++----- 8 files changed, 26 insertions(+), 20 deletions(-) diff --git a/lbry/error/README.md b/lbry/error/README.md index cc5ab7a3b5..2c747408ed 100644 --- a/lbry/error/README.md +++ b/lbry/error/README.md @@ -81,8 +81,8 @@ Code | Name | Message 511 | CorruptBlob | Blobs is corrupted. 520 | BlobFailedEncryption | Failed to encrypt blob. 531 | DownloadCancelled | Download was canceled. -532 | DownloadSDTimeout | Failed to download sd blob {download} within timeout. -533 | DownloadDataTimeout | Failed to download data blobs for sd hash {download} within timeout. +532 | DownloadMetadataTimeout | Failed to download metadata for {download} within timeout. +533 | DownloadDataTimeout | Failed to download data blobs for {download} within timeout. 534 | InvalidStreamDescriptor | {message} 535 | InvalidData | {message} 536 | InvalidBlobHash | {message} diff --git a/lbry/error/__init__.py b/lbry/error/__init__.py index 7e18f5bf98..88886487eb 100644 --- a/lbry/error/__init__.py +++ b/lbry/error/__init__.py @@ -411,18 +411,18 @@ def __init__(self): super().__init__("Download was canceled.") -class DownloadSDTimeoutError(BlobError): +class DownloadMetadataTimeoutError(BlobError): def __init__(self, download): self.download = download - super().__init__(f"Failed to download sd blob {download} within timeout.") + super().__init__(f"Failed to download metadata for {download} within timeout.") class DownloadDataTimeoutError(BlobError): def __init__(self, download): self.download = download - super().__init__(f"Failed to download data blobs for sd hash {download} within timeout.") + super().__init__(f"Failed to download data blobs for {download} within timeout.") class InvalidStreamDescriptorError(BlobError): diff --git a/lbry/extras/daemon/daemon.py b/lbry/extras/daemon/daemon.py index ac8782d7f1..f6f725b327 100644 --- a/lbry/extras/daemon/daemon.py +++ b/lbry/extras/daemon/daemon.py @@ -36,7 +36,7 @@ from lbry.blob_exchange.downloader import download_blob from lbry.dht.peer import make_kademlia_peer from lbry.error import ( - DownloadSDTimeoutError, ComponentsNotStartedError, ComponentStartConditionNotMetError, + DownloadMetadataTimeoutError, ComponentsNotStartedError, ComponentStartConditionNotMetError, CommandDoesNotExistError, BaseError, WalletNotFoundError, WalletAlreadyLoadedError, WalletAlreadyExistsError, ConflictingInputValueError, AlreadyPurchasedError, PrivateKeyNotFoundError, InputStringIsBlankError, InputValueError @@ -1140,7 +1140,7 @@ async def jsonrpc_get( save_file=save_file, wallet=wallet ) if not stream: - raise DownloadSDTimeoutError(uri) + raise DownloadMetadataTimeoutError(uri) except Exception as e: # TODO: use error from lbry.error log.warning("Error downloading %s: %s", uri, str(e)) diff --git a/lbry/file/file_manager.py b/lbry/file/file_manager.py index 9cf666a81c..9e2ae510f8 100644 --- a/lbry/file/file_manager.py +++ b/lbry/file/file_manager.py @@ -3,7 +3,7 @@ import typing from typing import Optional from aiohttp.web import Request -from lbry.error import ResolveError, DownloadSDTimeoutError, InsufficientFundsError +from lbry.error import ResolveError, DownloadMetadataTimeoutError, InsufficientFundsError from lbry.error import ResolveTimeoutError, DownloadDataTimeoutError, KeyFeeAboveMaxAllowedError from lbry.error import InvalidStreamURLError from lbry.stream.managed_stream import ManagedStream @@ -247,10 +247,10 @@ async def download_from_uri(self, uri, exchange_rate_manager: 'ExchangeRateManag await asyncio.wait_for(stream.save_file(), timeout - (self.loop.time() - before_download)) return stream except asyncio.TimeoutError: - error = DownloadDataTimeoutError(stream.sd_hash) + error = DownloadDataTimeoutError(stream.identifier) raise error except Exception as err: # forgive data timeout, don't delete stream - expected = (DownloadSDTimeoutError, DownloadDataTimeoutError, InsufficientFundsError, + expected = (DownloadMetadataTimeoutError, DownloadDataTimeoutError, InsufficientFundsError, KeyFeeAboveMaxAllowedError, ResolveError, InvalidStreamURLError) if isinstance(err, expected): log.warning("Failed to download %s: %s", uri, str(err)) diff --git a/lbry/stream/downloader.py b/lbry/stream/downloader.py index 39e24b37e9..cf79239121 100644 --- a/lbry/stream/downloader.py +++ b/lbry/stream/downloader.py @@ -4,7 +4,7 @@ import binascii from lbry.dht.node import get_kademlia_peers_from_hosts -from lbry.error import DownloadSDTimeoutError +from lbry.error import DownloadMetadataTimeoutError from lbry.utils import lru_cache_concurrent from lbry.stream.descriptor import StreamDescriptor from lbry.blob_exchange.downloader import BlobDownloader @@ -77,7 +77,7 @@ async def load_descriptor(self, connection_id: int = 0): log.info("downloaded sd blob %s", self.sd_hash) self.time_to_descriptor = self.loop.time() - now except asyncio.TimeoutError: - raise DownloadSDTimeoutError(self.sd_hash) + raise DownloadMetadataTimeoutError(self.sd_hash) # parse the descriptor self.descriptor = await StreamDescriptor.from_stream_descriptor_blob( diff --git a/lbry/stream/managed_stream.py b/lbry/stream/managed_stream.py index b089628201..3c57cefd58 100644 --- a/lbry/stream/managed_stream.py +++ b/lbry/stream/managed_stream.py @@ -5,7 +5,7 @@ import logging from typing import Optional from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable -from lbry.error import DownloadSDTimeoutError +from lbry.error import DownloadMetadataTimeoutError from lbry.schema.mime_types import guess_media_type from lbry.stream.downloader import StreamDownloader from lbry.stream.descriptor import StreamDescriptor, sanitize_file_name @@ -160,7 +160,7 @@ async def start(self, timeout: Optional[float] = None, await asyncio.wait_for(self.downloader.start(), timeout) except asyncio.TimeoutError: self._running.clear() - raise DownloadSDTimeoutError(self.sd_hash) + raise DownloadMetadataTimeoutError(self.identifier) if self.delayed_stop_task and not self.delayed_stop_task.done(): self.delayed_stop_task.cancel() diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index 35e59f5a12..b4b52bdb04 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -7,6 +7,7 @@ from typing import Optional from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable +from lbry.error import DownloadMetadataTimeoutError from lbry.file.source_manager import SourceManager from lbry.file.source import ManagedDownloadSource from lbry.schema.mime_types import guess_media_type @@ -57,7 +58,12 @@ def mime_type(self) -> Optional[str]: return guess_media_type(os.path.basename(self.full_path))[0] async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): - await self.torrent_session.add_torrent(self.identifier, self.download_directory) + try: + metadata_download = self.torrent_session.add_torrent(self.identifier, self.download_directory) + await asyncio.wait_for(metadata_download, timeout, loop=self.loop) + except asyncio.TimeoutError: + self.torrent_session.remove_torrent(btih=self.identifier) + raise DownloadMetadataTimeoutError(self.identifier) self.download_directory = self.torrent_session.save_path(self.identifier) self._file_name = Path(self.torrent_session.full_path(self.identifier)).name await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name) diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index ba6d8dbc82..26c961b621 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -11,7 +11,7 @@ from lbry.testcase import get_fake_exchange_rate_manager from lbry.utils import generate_id from lbry.error import InsufficientFundsError -from lbry.error import KeyFeeAboveMaxAllowedError, ResolveError, DownloadSDTimeoutError, DownloadDataTimeoutError +from lbry.error import KeyFeeAboveMaxAllowedError, ResolveError, DownloadMetadataTimeoutError, DownloadDataTimeoutError from lbry.wallet import WalletManager, Wallet, Ledger, Transaction, Input, Output, Database from lbry.wallet.constants import CENT, NULL_HASH32 from lbry.wallet.network import ClientSession @@ -232,7 +232,7 @@ def check_post(event): event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout.' ) - await self._test_time_to_first_bytes(check_post, DownloadSDTimeoutError, after_setup=after_setup) + await self._test_time_to_first_bytes(check_post, DownloadMetadataTimeoutError, after_setup=after_setup) async def test_override_fixed_peer_delay_dht_disabled(self): self.client_config.fixed_peers = [(self.server_from_client.address, self.server_from_client.tcp_port)] @@ -266,7 +266,7 @@ async def test_no_peers_timeout(self): def check_post(event): self.assertEqual(event['event'], 'Time To First Bytes') - self.assertEqual(event['properties']['error'], 'DownloadSDTimeoutError') + self.assertEqual(event['properties']['error'], 'DownloadMetadataTimeoutError') self.assertEqual(event['properties']['tried_peers_count'], 0) self.assertEqual(event['properties']['active_peer_count'], 0) self.assertFalse(event['properties']['use_fixed_peers']) @@ -277,7 +277,7 @@ def check_post(event): ) start = self.loop.time() - await self._test_time_to_first_bytes(check_post, DownloadSDTimeoutError) + await self._test_time_to_first_bytes(check_post, DownloadMetadataTimeoutError) duration = self.loop.time() - start self.assertLessEqual(duration, 5) self.assertGreaterEqual(duration, 3.0) @@ -387,7 +387,7 @@ async def test_download_sd_timeout(self): self.server.stop_server() await self.setup_stream_manager() await self._test_download_error_analytics_on_start( - DownloadSDTimeoutError, f'Failed to download sd blob {self.sd_hash} within timeout.', timeout=1 + DownloadMetadataTimeoutError, f'Failed to download sd blob {self.sd_hash} within timeout.', timeout=1 ) async def test_download_data_timeout(self): From b39971bf05240409842b61bdd5e287156c24c425 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Wed, 12 Oct 2022 15:22:36 -0300 Subject: [PATCH 18/33] fix tests for changed error msg --- tests/integration/datanetwork/test_file_commands.py | 4 ++-- tests/unit/stream/test_stream_manager.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index fdcdff3559..d4edb49368 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -369,12 +369,12 @@ async def test_download_different_timeouts(self): await self.server.blob_manager.delete_blobs(all_except_sd) resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True) self.assertIn('error', resp) - self.assertEqual('Failed to download data blobs for sd hash %s within timeout.' % sd_hash, resp['error']) + self.assertEqual('Failed to download data blobs for %s within timeout.' % sd_hash, resp['error']) self.assertTrue(await self.daemon.jsonrpc_file_delete(claim_name='foo'), "data timeout didn't create a file") await self.server.blob_manager.delete_blobs([sd_hash]) resp = await self.daemon.jsonrpc_get('lbry://foo', timeout=2, save_file=True) self.assertIn('error', resp) - self.assertEqual('Failed to download sd blob %s within timeout.' % sd_hash, resp['error']) + self.assertEqual('Failed to download metadata for %s within timeout.' % sd_hash, resp['error']) async def wait_files_to_complete(self): while await self.file_list(status='running'): diff --git a/tests/unit/stream/test_stream_manager.py b/tests/unit/stream/test_stream_manager.py index 26c961b621..48db7cf55f 100644 --- a/tests/unit/stream/test_stream_manager.py +++ b/tests/unit/stream/test_stream_manager.py @@ -229,7 +229,7 @@ def check_post(event): self.assertFalse(event['properties']['added_fixed_peers']) self.assertEqual(event['properties']['connection_failures_count'], 1) self.assertEqual( - event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout.' + event['properties']['error_message'], f'Failed to download metadata for {self.sd_hash} within timeout.' ) await self._test_time_to_first_bytes(check_post, DownloadMetadataTimeoutError, after_setup=after_setup) @@ -273,7 +273,7 @@ def check_post(event): self.assertFalse(event['properties']['added_fixed_peers']) self.assertIsNone(event['properties']['fixed_peer_delay']) self.assertEqual( - event['properties']['error_message'], f'Failed to download sd blob {self.sd_hash} within timeout.' + event['properties']['error_message'], f'Failed to download metadata for {self.sd_hash} within timeout.' ) start = self.loop.time() @@ -387,7 +387,7 @@ async def test_download_sd_timeout(self): self.server.stop_server() await self.setup_stream_manager() await self._test_download_error_analytics_on_start( - DownloadMetadataTimeoutError, f'Failed to download sd blob {self.sd_hash} within timeout.', timeout=1 + DownloadMetadataTimeoutError, f'Failed to download metadata for {self.sd_hash} within timeout.', timeout=1 ) async def test_download_data_timeout(self): @@ -396,7 +396,7 @@ async def test_download_data_timeout(self): head_blob_hash = json.loads(sdf.read())['blobs'][0]['blob_hash'] self.server_blob_manager.delete_blob(head_blob_hash) await self._test_download_error_analytics_on_start( - DownloadDataTimeoutError, f'Failed to download data blobs for sd hash {self.sd_hash} within timeout.', timeout=1 + DownloadDataTimeoutError, f'Failed to download data blobs for {self.sd_hash} within timeout.', timeout=1 ) async def test_unexpected_error(self): From 77d2c81a301225456721082fac1824f079f5440a Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 27 Oct 2022 20:10:00 -0300 Subject: [PATCH 19/33] fix missing added_on for torrent files --- lbry/file/source.py | 3 ++- tests/integration/datanetwork/test_file_commands.py | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lbry/file/source.py b/lbry/file/source.py index 519327629e..f8bcbdd82f 100644 --- a/lbry/file/source.py +++ b/lbry/file/source.py @@ -1,5 +1,6 @@ import os import asyncio +import time import typing import logging import binascii @@ -43,7 +44,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: ' self.rowid = rowid self.content_fee = content_fee self.purchase_receipt = None - self._added_on = added_on + self._added_on = added_on or int(time.time()) self.analytics_manager = analytics_manager self.downloader = None diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index d4edb49368..32647f5d54 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -1,3 +1,4 @@ +import time import unittest from unittest import skipIf import asyncio @@ -81,7 +82,9 @@ async def test_download_torrent(self): # second call, see its there and move on self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) - self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, btih) + file = (await self.daemon.jsonrpc_file_list())['items'][0] + self.assertEqual(btih, file.identifier) + self.assertAlmostEqual(time.time(), file.added_on, delta=2) self.assertIn(btih, self.client_session._handles) # stream over streaming API (full range of the largest file) From 2bf0ca6441f94bf2a21b592a3d994276f4041857 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 27 Oct 2022 20:34:33 -0300 Subject: [PATCH 20/33] fix mime_type for torrent on json encoder --- lbry/extras/daemon/json_response_encoder.py | 3 +-- tests/integration/datanetwork/test_file_commands.py | 7 ++++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/lbry/extras/daemon/json_response_encoder.py b/lbry/extras/daemon/json_response_encoder.py index bb4aefc41b..cda5617156 100644 --- a/lbry/extras/daemon/json_response_encoder.py +++ b/lbry/extras/daemon/json_response_encoder.py @@ -296,7 +296,7 @@ def encode_file(self, managed_stream): 'stream_name': None, 'suggested_file_name': None, 'sd_hash': None, - 'mime_type': None, + 'mime_type': managed_stream.mime_type, 'key': None, 'total_bytes_lower_bound': total_bytes_lower_bound, 'total_bytes': total_bytes, @@ -330,7 +330,6 @@ def encode_file(self, managed_stream): 'stream_name': managed_stream.stream_name, 'suggested_file_name': managed_stream.suggested_file_name, 'sd_hash': managed_stream.descriptor.sd_hash, - 'mime_type': managed_stream.mime_type, 'key': managed_stream.descriptor.key, 'blobs_completed': managed_stream.blobs_completed, 'blobs_in_stream': managed_stream.blobs_in_stream, diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 32647f5d54..93d06f4995 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -82,9 +82,10 @@ async def test_download_torrent(self): # second call, see its there and move on self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) - file = (await self.daemon.jsonrpc_file_list())['items'][0] - self.assertEqual(btih, file.identifier) - self.assertAlmostEqual(time.time(), file.added_on, delta=2) + file = (await self.out(self.daemon.jsonrpc_file_list()))['items'][0] + self.assertEqual(btih, file['metadata']['source']['bt_infohash']) + self.assertAlmostEqual(time.time(), file['added_on'], delta=2) + self.assertEqual("application/octet-stream", file['mime_type']) self.assertIn(btih, self.client_session._handles) # stream over streaming API (full range of the largest file) From 732b7e79d758084fd234a89b46b2057f2cb838f0 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 27 Oct 2022 20:38:21 -0300 Subject: [PATCH 21/33] fix suggested_file_name for torrent on json encoder --- lbry/extras/daemon/json_response_encoder.py | 3 +-- lbry/file/source.py | 4 ++++ tests/integration/datanetwork/test_file_commands.py | 1 + 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lbry/extras/daemon/json_response_encoder.py b/lbry/extras/daemon/json_response_encoder.py index cda5617156..777154f6ba 100644 --- a/lbry/extras/daemon/json_response_encoder.py +++ b/lbry/extras/daemon/json_response_encoder.py @@ -294,7 +294,7 @@ def encode_file(self, managed_stream): 'stopped': not managed_stream.running, 'stream_hash': None, 'stream_name': None, - 'suggested_file_name': None, + 'suggested_file_name': managed_stream.suggested_file_name, 'sd_hash': None, 'mime_type': managed_stream.mime_type, 'key': None, @@ -328,7 +328,6 @@ def encode_file(self, managed_stream): result.update({ 'stream_hash': managed_stream.stream_hash, 'stream_name': managed_stream.stream_name, - 'suggested_file_name': managed_stream.suggested_file_name, 'sd_hash': managed_stream.descriptor.sd_hash, 'key': managed_stream.descriptor.key, 'blobs_completed': managed_stream.blobs_completed, diff --git a/lbry/file/source.py b/lbry/file/source.py index f8bcbdd82f..b44576dd22 100644 --- a/lbry/file/source.py +++ b/lbry/file/source.py @@ -92,6 +92,10 @@ def file_name(self) -> Optional[str]: def added_on(self) -> Optional[int]: return self._added_on + @property + def suggested_file_name(self): + return self._file_name + @property def status(self) -> str: return self._status diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 93d06f4995..25d87166d4 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -86,6 +86,7 @@ async def test_download_torrent(self): self.assertEqual(btih, file['metadata']['source']['bt_infohash']) self.assertAlmostEqual(time.time(), file['added_on'], delta=2) self.assertEqual("application/octet-stream", file['mime_type']) + self.assertEqual("tmp1", file['suggested_file_name']) self.assertIn(btih, self.client_session._handles) # stream over streaming API (full range of the largest file) From 31c6e0e83523ec5840b1e091d1c9df1133216485 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 27 Oct 2022 20:40:49 -0300 Subject: [PATCH 22/33] fix stream_name for torrent on json encoder --- lbry/extras/daemon/json_response_encoder.py | 3 +-- lbry/file/source.py | 4 ++++ tests/integration/datanetwork/test_file_commands.py | 1 + 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lbry/extras/daemon/json_response_encoder.py b/lbry/extras/daemon/json_response_encoder.py index 777154f6ba..7b5f60840b 100644 --- a/lbry/extras/daemon/json_response_encoder.py +++ b/lbry/extras/daemon/json_response_encoder.py @@ -293,7 +293,7 @@ def encode_file(self, managed_stream): 'points_paid': 0.0, 'stopped': not managed_stream.running, 'stream_hash': None, - 'stream_name': None, + 'stream_name': managed_stream.stream_name, 'suggested_file_name': managed_stream.suggested_file_name, 'sd_hash': None, 'mime_type': managed_stream.mime_type, @@ -327,7 +327,6 @@ def encode_file(self, managed_stream): if is_stream: result.update({ 'stream_hash': managed_stream.stream_hash, - 'stream_name': managed_stream.stream_name, 'sd_hash': managed_stream.descriptor.sd_hash, 'key': managed_stream.descriptor.key, 'blobs_completed': managed_stream.blobs_completed, diff --git a/lbry/file/source.py b/lbry/file/source.py index b44576dd22..63ddc6a40f 100644 --- a/lbry/file/source.py +++ b/lbry/file/source.py @@ -96,6 +96,10 @@ def added_on(self) -> Optional[int]: def suggested_file_name(self): return self._file_name + @property + def stream_name(self): + return self.suggested_file_name + @property def status(self) -> str: return self._status diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 25d87166d4..7c1178f0e7 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -87,6 +87,7 @@ async def test_download_torrent(self): self.assertAlmostEqual(time.time(), file['added_on'], delta=2) self.assertEqual("application/octet-stream", file['mime_type']) self.assertEqual("tmp1", file['suggested_file_name']) + self.assertEqual("tmp1", file['stream_name']) self.assertIn(btih, self.client_session._handles) # stream over streaming API (full range of the largest file) From 651348f6e0faa3d3b342a1f2ec63fd4554dbef4f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 28 Oct 2022 01:26:35 -0300 Subject: [PATCH 23/33] fix status for completed torrents --- lbry/torrent/torrent_manager.py | 4 ++++ .../integration/datanetwork/test_file_commands.py | 15 +++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index b4b52bdb04..96d4d50a15 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -104,6 +104,10 @@ def stop_tasks(self): def completed(self): return self.torrent_session.is_completed(self.identifier) + @property + def status(self): + return self.STATUS_FINISHED if self.completed else self.STATUS_RUNNING + async def stream_file(self, request): log.info("stream torrent to browser for lbry://%s#%s (btih %s...)", self.claim_name, self.claim_id, self.identifier[:6]) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 7c1178f0e7..c76172c95e 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -82,16 +82,23 @@ async def test_download_torrent(self): # second call, see its there and move on self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) + self.assertIn(btih, self.client_session._handles) + + # stream over streaming API (full range of the largest file) + await self.assert_torrent_streaming_works(btih) + + # check json encoder fields for torrent sources file = (await self.out(self.daemon.jsonrpc_file_list()))['items'][0] self.assertEqual(btih, file['metadata']['source']['bt_infohash']) self.assertAlmostEqual(time.time(), file['added_on'], delta=2) self.assertEqual("application/octet-stream", file['mime_type']) self.assertEqual("tmp1", file['suggested_file_name']) self.assertEqual("tmp1", file['stream_name']) - self.assertIn(btih, self.client_session._handles) - - # stream over streaming API (full range of the largest file) - await self.assert_torrent_streaming_works(btih) + self.assertTrue(file['completed']) + self.assertGreater(file['total_bytes_lower_bound'], 0) + self.assertEqual(file['total_bytes_lower_bound'], file['total_bytes']) + self.assertEqual(file['total_bytes'], file['written_bytes']) + self.assertEqual('finished', file['status']) tx, new_btih = await self.initialize_torrent(tx) self.assertNotEqual(btih, new_btih) From 1041a19467669cee07c9058c3b8b3182619a9753 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 28 Oct 2022 11:35:33 -0300 Subject: [PATCH 24/33] deserialize torrent fields properly --- lbry/torrent/torrent_manager.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index 96d4d50a15..3d24b98af5 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -1,5 +1,4 @@ import asyncio -import binascii import logging import os import typing @@ -23,12 +22,6 @@ log = logging.getLogger(__name__) -def path_or_none(encoded_path) -> Optional[str]: - if not encoded_path: - return - return binascii.unhexlify(encoded_path).decode() - - class TorrentSource(ManagedDownloadSource): STATUS_STOPPED = "stopped" filter_fields = SourceManager.filter_fields @@ -185,6 +178,8 @@ async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[s async def initialize_from_database(self): for file in await self.storage.get_all_torrent_files(): claim = await self.storage.get_content_claim_for_torrent(file['bt_infohash']) + file['download_directory'] = bytes.fromhex(file['download_directory'] or '').decode() or None + file['file_name'] = bytes.fromhex(file['file_name'] or '').decode() or None await self._load_stream(None, claim=claim, **file) async def start(self): From 39da718c2808e5d51afb89ad30b87a6ccf2952f1 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 28 Oct 2022 18:04:35 -0300 Subject: [PATCH 25/33] remove dead code --- lbry/torrent/torrent_manager.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index 3d24b98af5..b6ac5ed3d0 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -199,9 +199,6 @@ async def create(self, file_path: str, key: Optional[bytes] = None, async def _delete(self, source: ManagedDownloadSource, delete_file: Optional[bool] = False): raise NotImplementedError - # blob_hashes = [source.sd_hash] + [b.blob_hash for b in source.descriptor.blobs[:-1]] - # await self.blob_manager.delete_blobs(blob_hashes, delete_from_db=False) - # await self.storage.delete_stream(source.descriptor) async def stream_partial_content(self, request: Request, identifier: str): return await self._sources[identifier].stream_file(request) From 8ce53069ad8fbca400f9741ca42280ef374bb88f Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 28 Oct 2022 18:32:05 -0300 Subject: [PATCH 26/33] fix filtering for fields missing on torrents --- lbry/file/file_manager.py | 17 ++++++++++------- lbry/stream/stream_manager.py | 2 +- .../datanetwork/test_file_commands.py | 3 +++ 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/lbry/file/file_manager.py b/lbry/file/file_manager.py index 9e2ae510f8..d2f7dc399a 100644 --- a/lbry/file/file_manager.py +++ b/lbry/file/file_manager.py @@ -297,14 +297,17 @@ async def stream_partial_content(self, request: Request, identifier: str): def get_filtered(self, *args, **kwargs) -> typing.List[ManagedDownloadSource]: """ - Get a list of filtered and sorted ManagedStream objects - - :param sort_by: field to sort by - :param reverse: reverse sorting - :param comparison: comparison operator used for filtering - :param search_by: fields and values to filter by + Get a list of filtered and sorted ManagedDownloadSource objects from all available source managers """ - return sum((manager.get_filtered(*args, **kwargs) for manager in self.source_managers.values()), []) + result = last_error = None + for manager in self.source_managers.values(): + try: + result = (result or []) + manager.get_filtered(*args, **kwargs) + except ValueError as error: + last_error = error + if result is not None: + return result + raise last_error async def delete(self, source: ManagedDownloadSource, delete_file=False): for manager in self.source_managers.values(): diff --git a/lbry/stream/stream_manager.py b/lbry/stream/stream_manager.py index 7ecf7e442d..1fda2e4631 100644 --- a/lbry/stream/stream_manager.py +++ b/lbry/stream/stream_manager.py @@ -32,7 +32,7 @@ def path_or_none(encoded_path) -> Optional[str]: class StreamManager(SourceManager): _sources: typing.Dict[str, ManagedStream] - filter_fields = SourceManager.filter_fields + filter_fields = set(SourceManager.filter_fields) filter_fields.update({ 'sd_hash', 'stream_hash', diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index c76172c95e..2f7ccd21af 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -100,6 +100,9 @@ async def test_download_torrent(self): self.assertEqual(file['total_bytes'], file['written_bytes']) self.assertEqual('finished', file['status']) + # filter by a field which is missing on torrent + self.assertItemCount(await self.daemon.jsonrpc_file_list(stream_hash="abc"), 0) + tx, new_btih = await self.initialize_torrent(tx) self.assertNotEqual(btih, new_btih) # claim now points to another torrent, update to it From 2bea8f58e0746a7385b393d341d0c0d332483401 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 28 Oct 2022 19:44:00 -0300 Subject: [PATCH 27/33] fix duplicated file entry on startup --- lbry/torrent/torrent_manager.py | 7 +++++-- tests/integration/datanetwork/test_file_commands.py | 2 ++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index b6ac5ed3d0..ce873b07cf 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -50,7 +50,7 @@ def full_path(self) -> Optional[str]: def mime_type(self) -> Optional[str]: return guess_media_type(os.path.basename(self.full_path))[0] - async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): + async def setup(self, timeout: Optional[float] = None): try: metadata_download = self.torrent_session.add_torrent(self.identifier, self.download_directory) await asyncio.wait_for(metadata_download, timeout, loop=self.loop) @@ -59,6 +59,9 @@ async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] raise DownloadMetadataTimeoutError(self.identifier) self.download_directory = self.torrent_session.save_path(self.identifier) self._file_name = Path(self.torrent_session.full_path(self.identifier)).name + + async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): + await self.setup(timeout) await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name) self.rowid = await self.storage.save_downloaded_file( self.identifier, self.file_name, self.download_directory, 0.0, added_on=self._added_on @@ -173,7 +176,7 @@ async def _load_stream(self, rowid: int, bt_infohash: str, file_name: Optional[s torrent_session=self.torrent_session ) self.add(stream) - await stream.start() + await stream.setup() async def initialize_from_database(self): for file in await self.storage.get_all_torrent_files(): diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 2f7ccd21af..e259041b92 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -115,6 +115,8 @@ async def test_download_torrent(self): await self.daemon.file_manager.start() self.assertEqual((await self.daemon.jsonrpc_file_list())['items'][0].identifier, new_btih) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) + # check it was saved properly, once + self.assertEqual(1, len(await self.daemon.storage.get_all_torrent_files())) self.assertIn(new_btih, self.client_session._handles) self.assertNotIn(btih, self.client_session._handles) From 9dc617f8e0500003eb51bb8f29e0c1e67e8250c6 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Fri, 28 Oct 2022 19:44:40 -0300 Subject: [PATCH 28/33] use a non-default port for streaming test so it can run with a live instance --- tests/integration/datanetwork/test_file_commands.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index e259041b92..49f7b8f335 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -21,6 +21,7 @@ class FileCommands(CommandTestCase): def __init__(self, *a, **kw): super().__init__(*a, **kw) self.skip_libtorrent = False + self.streaming_port = 60818 async def add_forever(self): while True: @@ -56,11 +57,11 @@ async def initialize_torrent(self, tx_to_update=None): return tx, btih async def assert_torrent_streaming_works(self, btih): - url = f'http://{self.daemon.conf.streaming_host}:{self.daemon.conf.streaming_port}/get/torrent' + url = f'http://{self.daemon.conf.streaming_host}:{self.streaming_port}/get/torrent' if self.daemon.streaming_runner.server is None: await self.daemon.streaming_runner.setup() site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host, - self.daemon.conf.streaming_port) + self.streaming_port) await site.start() async with aiohttp_request('get', url) as req: self.assertEqual(req.headers.get('Content-Type'), 'application/octet-stream') From 5cf63fa03eee7b9215a8c3e66b8f6d1ce85a181b Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Tue, 1 Nov 2022 17:59:49 -0300 Subject: [PATCH 29/33] restore torrent rowid on restart --- lbry/extras/daemon/storage.py | 3 ++- lbry/torrent/torrent_manager.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/lbry/extras/daemon/storage.py b/lbry/extras/daemon/storage.py index a36f7e9698..65e7cfcd19 100644 --- a/lbry/extras/daemon/storage.py +++ b/lbry/extras/daemon/storage.py @@ -638,7 +638,8 @@ def get_all_lbry_files(self) -> typing.Awaitable[typing.List[typing.Dict]]: async def get_all_torrent_files(self) -> typing.List[typing.Dict]: def _get_all_torrent_files(transaction): - cursor = transaction.execute("select * from file join torrent on file.bt_infohash=torrent.bt_infohash") + cursor = transaction.execute( + "select file.ROWID as rowid, * from file join torrent on file.bt_infohash=torrent.bt_infohash") return map(lambda row: dict(zip(list(map(itemgetter(0), cursor.description)), row)), cursor.fetchall()) return list(await self.db.run(_get_all_torrent_files)) diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index ce873b07cf..a6e9c89419 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -183,7 +183,7 @@ async def initialize_from_database(self): claim = await self.storage.get_content_claim_for_torrent(file['bt_infohash']) file['download_directory'] = bytes.fromhex(file['download_directory'] or '').decode() or None file['file_name'] = bytes.fromhex(file['file_name'] or '').decode() or None - await self._load_stream(None, claim=claim, **file) + await self._load_stream(claim=claim, **file) async def start(self): await super().start() From 9d869820a3776d02b655ea2b3fa969139ecfd2c5 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 5 Nov 2022 00:54:41 -0300 Subject: [PATCH 30/33] test picking file from claim file name --- .../datanetwork/test_file_commands.py | 53 ++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/tests/integration/datanetwork/test_file_commands.py b/tests/integration/datanetwork/test_file_commands.py index 49f7b8f335..5b672e7c28 100644 --- a/tests/integration/datanetwork/test_file_commands.py +++ b/tests/integration/datanetwork/test_file_commands.py @@ -22,62 +22,59 @@ def __init__(self, *a, **kw): super().__init__(*a, **kw) self.skip_libtorrent = False self.streaming_port = 60818 + self.seeder_session = None - async def add_forever(self): - while True: - for handle in self.client_session._handles.values(): - handle._handle.connect_peer(('127.0.0.1', 4040)) - await asyncio.sleep(.1) - - async def initialize_torrent(self, tx_to_update=None): - if not hasattr(self, 'seeder_session'): + async def initialize_torrent(self, tx_to_update=None, pick_a_file=True, name=None): + assert name is None or tx_to_update is None + if not self.seeder_session: self.seeder_session = TorrentSession(self.loop, None) self.addCleanup(self.seeder_session.stop) await self.seeder_session.bind('127.0.0.1', port=4040) btih = await self.seeder_session.add_fake_torrent(file_count=3) + files = [(size, path) for (path, size) in self.seeder_session.get_files(btih).items()] + files.sort() + # picking a file will pick something in the middle, while automatic selection will pick largest + self.expected_size, self.expected_path = files[1] if pick_a_file else files[-1] + address = await self.account.receiving.get_or_create_usable_address() + claim = tx_to_update.outputs[0].claim if tx_to_update else Claim() + claim.stream.update(bt_infohash=btih) + if pick_a_file: + claim.stream.source.name = os.path.basename(self.expected_path) if not tx_to_update: - claim = Claim() - claim.stream.update(bt_infohash=btih) tx = await Transaction.claim_create( - 'torrent', claim, 1, address, [self.account], self.account + name or 'torrent', claim, 1, address, [self.account], self.account ) else: - claim = tx_to_update.outputs[0].claim - claim.stream.update(bt_infohash=btih) tx = await Transaction.claim_update( tx_to_update.outputs[0], claim, 1, address, [self.account], self.account ) await tx.sign([self.account]) await self.broadcast_and_confirm(tx) self.client_session = self.daemon.file_manager.source_managers['torrent'].torrent_session - self.client_session.wait_start = False # fixme: this is super slow on tests - task = asyncio.create_task(self.add_forever()) - self.addCleanup(task.cancel) return tx, btih async def assert_torrent_streaming_works(self, btih): - url = f'http://{self.daemon.conf.streaming_host}:{self.streaming_port}/get/torrent' + url = f'http://{self.daemon.conf.streaming_host}:{self.streaming_port}/stream/{btih}' if self.daemon.streaming_runner.server is None: await self.daemon.streaming_runner.setup() site = aiohttp.web.TCPSite(self.daemon.streaming_runner, self.daemon.conf.streaming_host, self.streaming_port) await site.start() async with aiohttp_request('get', url) as req: + self.assertEqual(req.status, 206) self.assertEqual(req.headers.get('Content-Type'), 'application/octet-stream') content_range = req.headers.get('Content-Range') content_length = int(req.headers.get('Content-Length')) streamed_bytes = await req.content.read() - expected_size = os.path.getsize(self.seeder_session.full_path(btih)) + expected_size = self.expected_size self.assertEqual(expected_size, len(streamed_bytes)) self.assertEqual(content_length, len(streamed_bytes)) self.assertEqual(f"bytes 0-{expected_size - 1}/{expected_size}", content_range) - self.assertEqual(len(streamed_bytes), max(self.seeder_session.get_files(btih).values())) - @skipIf(TorrentSession is None, "libtorrent not installed") async def test_download_torrent(self): - tx, btih = await self.initialize_torrent() + tx, btih = await self.initialize_torrent(pick_a_file=False) self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent'))) self.assertItemCount(await self.daemon.jsonrpc_file_list(), 1) # second call, see its there and move on @@ -91,10 +88,13 @@ async def test_download_torrent(self): # check json encoder fields for torrent sources file = (await self.out(self.daemon.jsonrpc_file_list()))['items'][0] self.assertEqual(btih, file['metadata']['source']['bt_infohash']) - self.assertAlmostEqual(time.time(), file['added_on'], delta=2) + self.assertAlmostEqual(time.time(), file['added_on'], delta=12) self.assertEqual("application/octet-stream", file['mime_type']) - self.assertEqual("tmp1", file['suggested_file_name']) - self.assertEqual("tmp1", file['stream_name']) + self.assertEqual(os.path.basename(self.expected_path), file['suggested_file_name']) + self.assertEqual(os.path.basename(self.expected_path), file['stream_name']) + while not file['completed']: # improve that + await asyncio.sleep(0.5) + file = (await self.out(self.daemon.jsonrpc_file_list()))['items'][0] self.assertTrue(file['completed']) self.assertGreater(file['total_bytes_lower_bound'], 0) self.assertEqual(file['total_bytes_lower_bound'], file['total_bytes']) @@ -126,6 +126,11 @@ async def test_download_torrent(self): self.assertItemCount(await self.daemon.jsonrpc_file_list(), 0) self.assertNotIn(new_btih, self.client_session._handles) + await self.initialize_torrent(name='torrent2') + self.assertNotIn('error', await self.out(self.daemon.jsonrpc_get('torrent2'))) + file = (await self.out(self.daemon.jsonrpc_file_list()))['items'][0] + self.assertEqual(os.path.basename(self.expected_path), file['stream_name']) + async def create_streams_in_range(self, *args, **kwargs): self.stream_claim_ids = [] for i in range(*args, **kwargs): From 64aad14ba6e26e9d4e44de873dda7299d1f4c782 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 5 Nov 2022 00:55:38 -0300 Subject: [PATCH 31/33] pick file from file name, fallback to largest --- lbry/torrent/session.py | 77 ++++++++++++++++++--------------- lbry/torrent/torrent_manager.py | 46 +++++++++++++++----- 2 files changed, 77 insertions(+), 46 deletions(-) diff --git a/lbry/torrent/session.py b/lbry/torrent/session.py index a8308d4499..abdbff27b5 100644 --- a/lbry/torrent/session.py +++ b/lbry/torrent/session.py @@ -23,7 +23,6 @@ def __init__(self, loop, executor, handle): self._loop = loop self._executor = executor self._handle: libtorrent.torrent_handle = handle - self.started = asyncio.Event(loop=loop) self.finished = asyncio.Event(loop=loop) self.metadata_completed = asyncio.Event(loop=loop) self.size = handle.status().total_wanted @@ -37,12 +36,14 @@ def __init__(self, loop, executor, handle): def torrent_file(self) -> Optional[libtorrent.file_storage]: return self._torrent_info.files() - @property - def largest_file(self) -> Optional[str]: + def full_path_at(self, file_num) -> Optional[str]: if self.torrent_file is None: return None - index = self.largest_file_index - return os.path.join(self.save_path, self.torrent_file.file_path(index)) + return os.path.join(self.save_path, self.torrent_file.file_path(file_num)) + + def size_at(self, file_num) -> Optional[int]: + if self.torrent_file is not None: + return self.torrent_file.file_size(file_num) @property def save_path(self) -> Optional[str]: @@ -50,16 +51,12 @@ def save_path(self) -> Optional[str]: self._base_path = self._handle.status().save_path return self._base_path - @property - def largest_file_index(self): - largest_size, index = 0, 0 + def index_from_name(self, file_name): for file_num in range(self.torrent_file.num_files()): if '.pad' in self.torrent_file.file_path(file_num): continue # ignore padding files - if self.torrent_file.file_size(file_num) > largest_size: - largest_size = self.torrent_file.file_size(file_num) - index = file_num - return index + if file_name == os.path.basename(self.full_path_at(file_num)): + return file_num def stop_tasks(self): self._handle.save_resume_data() @@ -72,14 +69,16 @@ def byte_range_to_piece_range( end_piece = self._torrent_info.map_file(file_index, end_offset, 0) return start_piece, end_piece - async def stream_range_as_completed(self, file_index, start, end): + async def stream_range_as_completed(self, file_name, start, end): + file_index = self.index_from_name(file_name) + if file_index is None: + raise ValueError(f"Attempt to stream from invalid file. Expected name: {file_name}") first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end) start_piece_offset = first_piece.start piece_size = self._torrent_info.piece_length() log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d, piece size: %d): %s", first_piece.piece, final_piece.piece, start, end, piece_size, self.name) self.prioritize(file_index, start, end) - await self.resume() for piece_index in range(first_piece.piece, final_piece.piece + 1): while not self._handle.have_piece(piece_index): log.info("Waiting for piece %d: %s", piece_index, self.name) @@ -102,13 +101,6 @@ def _show_status(self): self.metadata_completed.set() self._torrent_info = self._handle.torrent_file() log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name) - # prioritize first 2mb - self.prioritize(self.largest_file_index, 0, 2 * 1024 * 1024) - first_piece = self.torrent_file.piece_index_at_file(self.largest_file_index) - if not self.started.is_set(): - if self._handle.have_piece(first_piece): - log.debug("Got first piece, set started - %s", self.name) - self.started.set() log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s', status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000, status.num_peers, status.num_seeds, status.state, status.save_path) @@ -150,9 +142,11 @@ def __init__(self, loop, executor): self._loop = loop self._executor = executor self._session: Optional[libtorrent.session] = None - self._handles = {} + self._handles: Dict[str, TorrentHandle] = {} self.tasks = [] - self.wait_start = True + + def add_peer(self, btih, addr, port): + self._handles[btih]._handle.connect_peer((addr, port)) async def add_fake_torrent(self, file_count=3): tmpdir = mkdtemp() @@ -180,9 +174,10 @@ def stop(self): self._handles.popitem()[1].stop_tasks() while self.tasks: self.tasks.pop().cancel() - self._session.save_state() - self._session.pause() - self._session = None + if self._session: + self._session.save_state() + self._session.pause() + self._session = None def _pop_alerts(self): for alert in self._session.pop_alerts(): @@ -216,21 +211,23 @@ def _add_torrent(self, btih: str, download_directory: Optional[str]): handle.force_dht_announce() self._handles[btih] = TorrentHandle(self._loop, self._executor, handle) - def full_path(self, btih): - return self._handles[btih].largest_file + def full_path(self, btih, file_num) -> Optional[str]: + return self._handles[btih].full_path_at(file_num) def save_path(self, btih): return self._handles[btih].save_path + def has_torrent(self, btih): + return btih in self._handles + async def add_torrent(self, btih, download_path): + if btih in self._handles: + return await self._handles[btih].metadata_completed.wait() await self._loop.run_in_executor( self._executor, self._add_torrent, btih, download_path ) self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop())) await self._handles[btih].metadata_completed.wait() - if self.wait_start: - # fixme: temporary until we add streaming support, otherwise playback fails! - await self._handles[btih].started.wait() def remove_torrent(self, btih, remove_files=False): if btih in self._handles: @@ -243,9 +240,17 @@ async def save_file(self, btih, download_directory): handle = self._handles[btih] await handle.resume() - def get_size(self, btih): + def get_total_size(self, btih): return self._handles[btih].size + def get_index_from_name(self, btih, file_name): + return self._handles[btih].index_from_name(file_name) + + def get_size(self, btih, file_name) -> Optional[int]: + for (path, size) in self.get_files(btih).items(): + if os.path.basename(path) == file_name: + return size + def get_name(self, btih): return self._handles[btih].name @@ -255,14 +260,14 @@ def get_downloaded(self, btih): def is_completed(self, btih): return self._handles[btih].finished.is_set() - def stream_largest_file(self, btih, start, end): + def stream_file(self, btih, file_name, start, end): handle = self._handles[btih] - return handle.stream_range_as_completed(handle.largest_file_index, start, end) + return handle.stream_range_as_completed(file_name, start, end) def get_files(self, btih) -> Dict: handle = self._handles[btih] return { - handle.torrent_file.file_path(file_num): handle.torrent_file.file_size(file_num) + self.full_path(btih, file_num): handle.torrent_file.file_size(file_num) for file_num in range(handle.torrent_file.num_files()) if '.pad' not in handle.torrent_file.file_path(file_num) } @@ -302,7 +307,7 @@ async def main(): await session.bind() await session.add_torrent(btih, os.path.expanduser("~/Downloads")) while True: - session.full_path(btih) + session.full_path(btih, 0) await asyncio.sleep(1) await session.pause() executor.shutdown() diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index a6e9c89419..b2b4b3f986 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -39,12 +39,33 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: ' super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id, rowid, content_fee, analytics_manager, added_on) self.torrent_session = torrent_session + self._suggested_file_name = None + self._full_path = None @property def full_path(self) -> Optional[str]: - full_path = self.torrent_session.full_path(self.identifier) + if not self._full_path: + self._full_path = self.select_path() + self._file_name = os.path.basename(self._full_path) self.download_directory = self.torrent_session.save_path(self.identifier) - return full_path + return self._full_path + + def select_path(self): + wanted_name = (self.stream_claim_info and self.stream_claim_info.claim.stream.source.name) or '' + wanted_index = self.torrent_session.get_index_from_name(self.identifier, wanted_name) + if wanted_index is None: + # maybe warn? + largest = None + for (path, size) in self.torrent_session.get_files(self.identifier).items(): + largest = (path, size) if not largest or size > largest[1] else largest + return largest[0] + else: + return self.torrent_session.full_path(self.identifier, wanted_index or 0) + + @property + def suggested_file_name(self): + self._suggested_file_name = self._suggested_file_name or os.path.basename(self.select_path()) + return self._suggested_file_name @property def mime_type(self) -> Optional[str]: @@ -58,14 +79,15 @@ async def setup(self, timeout: Optional[float] = None): self.torrent_session.remove_torrent(btih=self.identifier) raise DownloadMetadataTimeoutError(self.identifier) self.download_directory = self.torrent_session.save_path(self.identifier) - self._file_name = Path(self.torrent_session.full_path(self.identifier)).name + self._file_name = os.path.basename(self.full_path) async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False): await self.setup(timeout) - await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name) - self.rowid = await self.storage.save_downloaded_file( - self.identifier, self.file_name, self.download_directory, 0.0, added_on=self._added_on - ) + if not self.rowid: + await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name) + self.rowid = await self.storage.save_downloaded_file( + self.identifier, self.file_name, self.download_directory, 0.0, added_on=self._added_on + ) async def stop(self, finished: bool = False): await self.torrent_session.remove_torrent(self.identifier) @@ -75,11 +97,11 @@ async def save_file(self, file_name: Optional[str] = None, download_directory: O @property def torrent_length(self): - return self.torrent_session.get_size(self.identifier) + return self.torrent_session.get_total_size(self.identifier) @property def stream_length(self): - return os.path.getsize(self.full_path) + return self.torrent_session.get_size(self.identifier, self.file_name) @property def written_bytes(self): @@ -110,15 +132,19 @@ async def stream_file(self, request): headers, start, end = self._prepare_range_response_headers( request.headers.get('range', 'bytes=0-') ) + target = self.suggested_file_name await self.start() response = StreamResponse( status=206, headers=headers ) await response.prepare(request) + while not os.path.exists(self.full_path): + async for _ in self.torrent_session.stream_file(self.identifier, target, start, end): + break with open(self.full_path, 'rb') as infile: infile.seek(start) - async for read_size in self.torrent_session.stream_largest_file(self.identifier, start, end): + async for read_size in self.torrent_session.stream_file(self.identifier, target, start, end): if infile.tell() + read_size < end: await response.write(infile.read(read_size)) else: From 636b7ed4762867e5ec58395eed04c72275acc56d Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Sat, 5 Nov 2022 00:56:03 -0300 Subject: [PATCH 32/33] tests: enable logging lbry.torrent when verbosity changes --- lbry/testcase.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lbry/testcase.py b/lbry/testcase.py index 3ddaec4592..15a2d1806e 100644 --- a/lbry/testcase.py +++ b/lbry/testcase.py @@ -394,6 +394,7 @@ async def asyncSetUp(self): logging.getLogger('lbry.blob_exchange').setLevel(self.VERBOSITY) logging.getLogger('lbry.daemon').setLevel(self.VERBOSITY) logging.getLogger('lbry.stream').setLevel(self.VERBOSITY) + logging.getLogger('lbry.torrent').setLevel(self.VERBOSITY) logging.getLogger('lbry.wallet').setLevel(self.VERBOSITY) await super().asyncSetUp() From dbe3ace8127f1e58b96a615eba30ca3efd177e73 Mon Sep 17 00:00:00 2001 From: Victor Shyba Date: Thu, 15 Dec 2022 21:49:48 -0300 Subject: [PATCH 33/33] pylint --- lbry/torrent/torrent_manager.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lbry/torrent/torrent_manager.py b/lbry/torrent/torrent_manager.py index b2b4b3f986..8357c5033c 100644 --- a/lbry/torrent/torrent_manager.py +++ b/lbry/torrent/torrent_manager.py @@ -2,7 +2,6 @@ import logging import os import typing -from pathlib import Path from typing import Optional from aiohttp.web import Request, StreamResponse, HTTPRequestRangeNotSatisfiable @@ -51,13 +50,13 @@ def full_path(self) -> Optional[str]: return self._full_path def select_path(self): - wanted_name = (self.stream_claim_info and self.stream_claim_info.claim.stream.source.name) or '' + wanted_name = (self.stream_claim_info.claim.stream.source.name or '') if self.stream_claim_info else '' wanted_index = self.torrent_session.get_index_from_name(self.identifier, wanted_name) if wanted_index is None: # maybe warn? - largest = None + largest = (None, -1) for (path, size) in self.torrent_session.get_files(self.identifier).items(): - largest = (path, size) if not largest or size > largest[1] else largest + largest = (path, size) if size > largest[1] else largest return largest[0] else: return self.torrent_session.full_path(self.identifier, wanted_index or 0)