Skip to content

Commit

Permalink
pick file from file name, fallback to largest
Browse files Browse the repository at this point in the history
  • Loading branch information
shyba committed Dec 15, 2022
1 parent 9d86982 commit 64aad14
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 46 deletions.
77 changes: 41 additions & 36 deletions lbry/torrent/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def __init__(self, loop, executor, handle):
self._loop = loop
self._executor = executor
self._handle: libtorrent.torrent_handle = handle
self.started = asyncio.Event(loop=loop)
self.finished = asyncio.Event(loop=loop)
self.metadata_completed = asyncio.Event(loop=loop)
self.size = handle.status().total_wanted
Expand All @@ -37,29 +36,27 @@ def __init__(self, loop, executor, handle):
def torrent_file(self) -> Optional[libtorrent.file_storage]:
return self._torrent_info.files()

@property
def largest_file(self) -> Optional[str]:
def full_path_at(self, file_num) -> Optional[str]:
if self.torrent_file is None:
return None
index = self.largest_file_index
return os.path.join(self.save_path, self.torrent_file.file_path(index))
return os.path.join(self.save_path, self.torrent_file.file_path(file_num))

def size_at(self, file_num) -> Optional[int]:
if self.torrent_file is not None:
return self.torrent_file.file_size(file_num)

@property
def save_path(self) -> Optional[str]:
if not self._base_path:
self._base_path = self._handle.status().save_path
return self._base_path

@property
def largest_file_index(self):
largest_size, index = 0, 0
def index_from_name(self, file_name):
for file_num in range(self.torrent_file.num_files()):
if '.pad' in self.torrent_file.file_path(file_num):
continue # ignore padding files
if self.torrent_file.file_size(file_num) > largest_size:
largest_size = self.torrent_file.file_size(file_num)
index = file_num
return index
if file_name == os.path.basename(self.full_path_at(file_num)):
return file_num

def stop_tasks(self):
self._handle.save_resume_data()
Expand All @@ -72,14 +69,16 @@ def byte_range_to_piece_range(
end_piece = self._torrent_info.map_file(file_index, end_offset, 0)
return start_piece, end_piece

async def stream_range_as_completed(self, file_index, start, end):
async def stream_range_as_completed(self, file_name, start, end):
file_index = self.index_from_name(file_name)
if file_index is None:
raise ValueError(f"Attempt to stream from invalid file. Expected name: {file_name}")
first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end)
start_piece_offset = first_piece.start
piece_size = self._torrent_info.piece_length()
log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d, piece size: %d): %s",
first_piece.piece, final_piece.piece, start, end, piece_size, self.name)
self.prioritize(file_index, start, end)
await self.resume()
for piece_index in range(first_piece.piece, final_piece.piece + 1):
while not self._handle.have_piece(piece_index):
log.info("Waiting for piece %d: %s", piece_index, self.name)
Expand All @@ -102,13 +101,6 @@ def _show_status(self):
self.metadata_completed.set()
self._torrent_info = self._handle.torrent_file()
log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name)
# prioritize first 2mb
self.prioritize(self.largest_file_index, 0, 2 * 1024 * 1024)
first_piece = self.torrent_file.piece_index_at_file(self.largest_file_index)
if not self.started.is_set():
if self._handle.have_piece(first_piece):
log.debug("Got first piece, set started - %s", self.name)
self.started.set()
log.debug('%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d seeds: %d) %s - %s',
status.progress * 100, status.download_rate / 1000, status.upload_rate / 1000,
status.num_peers, status.num_seeds, status.state, status.save_path)
Expand Down Expand Up @@ -150,9 +142,11 @@ def __init__(self, loop, executor):
self._loop = loop
self._executor = executor
self._session: Optional[libtorrent.session] = None
self._handles = {}
self._handles: Dict[str, TorrentHandle] = {}
self.tasks = []
self.wait_start = True

def add_peer(self, btih, addr, port):
self._handles[btih]._handle.connect_peer((addr, port))

async def add_fake_torrent(self, file_count=3):
tmpdir = mkdtemp()
Expand Down Expand Up @@ -180,9 +174,10 @@ def stop(self):
self._handles.popitem()[1].stop_tasks()
while self.tasks:
self.tasks.pop().cancel()
self._session.save_state()
self._session.pause()
self._session = None
if self._session:
self._session.save_state()
self._session.pause()
self._session = None

def _pop_alerts(self):
for alert in self._session.pop_alerts():
Expand Down Expand Up @@ -216,21 +211,23 @@ def _add_torrent(self, btih: str, download_directory: Optional[str]):
handle.force_dht_announce()
self._handles[btih] = TorrentHandle(self._loop, self._executor, handle)

def full_path(self, btih):
return self._handles[btih].largest_file
def full_path(self, btih, file_num) -> Optional[str]:
return self._handles[btih].full_path_at(file_num)

def save_path(self, btih):
return self._handles[btih].save_path

def has_torrent(self, btih):
return btih in self._handles

async def add_torrent(self, btih, download_path):
if btih in self._handles:
return await self._handles[btih].metadata_completed.wait()
await self._loop.run_in_executor(
self._executor, self._add_torrent, btih, download_path
)
self._handles[btih].tasks.append(self._loop.create_task(self._handles[btih].status_loop()))
await self._handles[btih].metadata_completed.wait()
if self.wait_start:
# fixme: temporary until we add streaming support, otherwise playback fails!
await self._handles[btih].started.wait()

def remove_torrent(self, btih, remove_files=False):
if btih in self._handles:
Expand All @@ -243,9 +240,17 @@ async def save_file(self, btih, download_directory):
handle = self._handles[btih]
await handle.resume()

def get_size(self, btih):
def get_total_size(self, btih):
return self._handles[btih].size

def get_index_from_name(self, btih, file_name):
return self._handles[btih].index_from_name(file_name)

def get_size(self, btih, file_name) -> Optional[int]:
for (path, size) in self.get_files(btih).items():
if os.path.basename(path) == file_name:
return size

def get_name(self, btih):
return self._handles[btih].name

Expand All @@ -255,14 +260,14 @@ def get_downloaded(self, btih):
def is_completed(self, btih):
return self._handles[btih].finished.is_set()

def stream_largest_file(self, btih, start, end):
def stream_file(self, btih, file_name, start, end):
handle = self._handles[btih]
return handle.stream_range_as_completed(handle.largest_file_index, start, end)
return handle.stream_range_as_completed(file_name, start, end)

def get_files(self, btih) -> Dict:
handle = self._handles[btih]
return {
handle.torrent_file.file_path(file_num): handle.torrent_file.file_size(file_num)
self.full_path(btih, file_num): handle.torrent_file.file_size(file_num)
for file_num in range(handle.torrent_file.num_files())
if '.pad' not in handle.torrent_file.file_path(file_num)
}
Expand Down Expand Up @@ -302,7 +307,7 @@ async def main():
await session.bind()
await session.add_torrent(btih, os.path.expanduser("~/Downloads"))
while True:
session.full_path(btih)
session.full_path(btih, 0)
await asyncio.sleep(1)
await session.pause()
executor.shutdown()
Expand Down
46 changes: 36 additions & 10 deletions lbry/torrent/torrent_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,33 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: '
super().__init__(loop, config, storage, identifier, file_name, download_directory, status, claim, download_id,
rowid, content_fee, analytics_manager, added_on)
self.torrent_session = torrent_session
self._suggested_file_name = None
self._full_path = None

@property
def full_path(self) -> Optional[str]:
full_path = self.torrent_session.full_path(self.identifier)
if not self._full_path:
self._full_path = self.select_path()
self._file_name = os.path.basename(self._full_path)
self.download_directory = self.torrent_session.save_path(self.identifier)
return full_path
return self._full_path

def select_path(self):
wanted_name = (self.stream_claim_info and self.stream_claim_info.claim.stream.source.name) or ''
wanted_index = self.torrent_session.get_index_from_name(self.identifier, wanted_name)
if wanted_index is None:
# maybe warn?
largest = None
for (path, size) in self.torrent_session.get_files(self.identifier).items():
largest = (path, size) if not largest or size > largest[1] else largest
return largest[0]
else:
return self.torrent_session.full_path(self.identifier, wanted_index or 0)

@property
def suggested_file_name(self):
self._suggested_file_name = self._suggested_file_name or os.path.basename(self.select_path())
return self._suggested_file_name

@property
def mime_type(self) -> Optional[str]:
Expand All @@ -58,14 +79,15 @@ async def setup(self, timeout: Optional[float] = None):
self.torrent_session.remove_torrent(btih=self.identifier)
raise DownloadMetadataTimeoutError(self.identifier)
self.download_directory = self.torrent_session.save_path(self.identifier)
self._file_name = Path(self.torrent_session.full_path(self.identifier)).name
self._file_name = os.path.basename(self.full_path)

async def start(self, timeout: Optional[float] = None, save_now: Optional[bool] = False):
await self.setup(timeout)
await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name)
self.rowid = await self.storage.save_downloaded_file(
self.identifier, self.file_name, self.download_directory, 0.0, added_on=self._added_on
)
if not self.rowid:
await self.storage.add_torrent(self.identifier, self.torrent_length, self.torrent_name)
self.rowid = await self.storage.save_downloaded_file(
self.identifier, self.file_name, self.download_directory, 0.0, added_on=self._added_on
)

async def stop(self, finished: bool = False):
await self.torrent_session.remove_torrent(self.identifier)
Expand All @@ -75,11 +97,11 @@ async def save_file(self, file_name: Optional[str] = None, download_directory: O

@property
def torrent_length(self):
return self.torrent_session.get_size(self.identifier)
return self.torrent_session.get_total_size(self.identifier)

@property
def stream_length(self):
return os.path.getsize(self.full_path)
return self.torrent_session.get_size(self.identifier, self.file_name)

@property
def written_bytes(self):
Expand Down Expand Up @@ -110,15 +132,19 @@ async def stream_file(self, request):
headers, start, end = self._prepare_range_response_headers(
request.headers.get('range', 'bytes=0-')
)
target = self.suggested_file_name
await self.start()
response = StreamResponse(
status=206,
headers=headers
)
await response.prepare(request)
while not os.path.exists(self.full_path):
async for _ in self.torrent_session.stream_file(self.identifier, target, start, end):
break
with open(self.full_path, 'rb') as infile:
infile.seek(start)
async for read_size in self.torrent_session.stream_largest_file(self.identifier, start, end):
async for read_size in self.torrent_session.stream_file(self.identifier, target, start, end):
if infile.tell() + read_size < end:
await response.write(infile.read(read_size))
else:
Expand Down

0 comments on commit 64aad14

Please sign in to comment.