Skip to content

Commit

Permalink
stream from torrent pieces, holding the response until the piece is c…
Browse files Browse the repository at this point in the history
…ompleted
  • Loading branch information
shyba committed Sep 5, 2022
1 parent 86ec8f2 commit 4fbe4e8
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 7 deletions.
33 changes: 30 additions & 3 deletions lbry/torrent/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import random
from hashlib import sha1
from tempfile import mkdtemp
from typing import Optional
from typing import Optional, Tuple

import libtorrent

Expand Down Expand Up @@ -65,9 +65,13 @@ def __init__(self, loop, executor, handle):
self.total_wanted_done = 0
self.name = ''
self.tasks = []
self.torrent_file: Optional[libtorrent.file_storage] = None
self._torrent_info: libtorrent.torrent_info = handle.torrent_file()
self._base_path = None

@property
def torrent_file(self) -> Optional[libtorrent.file_storage]:
return self._torrent_info.files()

@property
def largest_file(self) -> Optional[str]:
if self.torrent_file is None:
Expand All @@ -92,6 +96,25 @@ def stop_tasks(self):
while self.tasks:
self.tasks.pop().cancel()

def byte_range_to_piece_range(
self, file_index, start_offset, end_offset) -> Tuple[libtorrent.peer_request, libtorrent.peer_request]:
start_piece = self._torrent_info.map_file(file_index, start_offset, 0)
end_piece = self._torrent_info.map_file(file_index, end_offset, 0)
return start_piece, end_piece

async def stream_range_as_completed(self, file_index, start, end):
first_piece, final_piece = self.byte_range_to_piece_range(file_index, start, end)
start_piece_offset = final_piece.start
piece_size = self._torrent_info.piece_length()
log.info("Streaming torrent from piece %d to %d (bytes: %d -> %d): %s",
first_piece.piece, final_piece.piece, start, end, self.name)
for piece_index in range(first_piece.piece, final_piece.piece + 1):
while not self._handle.have_piece(piece_index):
log.info("Waiting for piece %d: %s", piece_index, self.name)
await asyncio.sleep(0.2)
log.info("Streaming piece offset %d / %d for torrent %s", piece_index, final_piece.piece, self.name)
yield piece_size - start_piece_offset

def _show_status(self):
# fixme: cleanup
if not self._handle.is_valid():
Expand All @@ -103,8 +126,8 @@ def _show_status(self):
self.name = status.name
if not self.metadata_completed.is_set():
self.metadata_completed.set()
self._torrent_info = self._handle.torrent_file()
log.info("Metadata completed for btih:%s - %s", status.info_hash, self.name)
self.torrent_file = self._handle.torrent_file().files()
self._base_path = status.save_path
first_piece = self.torrent_file.piece_index_at_file(self.largest_file_index)
if not self.started.is_set():
Expand Down Expand Up @@ -254,6 +277,10 @@ def get_downloaded(self, btih):
def is_completed(self, btih):
return self._handles[btih].finished.is_set()

def stream_largest_file(self, btih, start, end):
handle = self._handles[btih]
return handle.stream_range_as_completed(handle.largest_file_index, start, end)


def get_magnet_uri(btih):
return f"magnet:?xt=urn:btih:{btih}"
Expand Down
12 changes: 8 additions & 4 deletions lbry/torrent/torrent_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def completed(self):
async def stream_file(self, request):
log.info("stream torrent to browser for lbry://%s#%s (btih %s...)", self.claim_name, self.claim_id,
self.identifier[:6])
headers, size, start, end = self._prepare_range_response_headers(
headers, start, end = self._prepare_range_response_headers(
request.headers.get('range', 'bytes=0-')
)
await self.start()
Expand All @@ -105,9 +105,13 @@ async def stream_file(self, request):
await response.prepare(request)
with open(self.full_path, 'rb') as infile:
infile.seek(start)
await response.write_eof(infile.read(size))
async for read_size in self.torrent_session.stream_largest_file(self.identifier, start, end):
if start + read_size < end:
await response.write(infile.read(read_size))
else:
await response.write_eof(infile.read(end - infile.tell()))

def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int, int]:
def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing.Dict[str, str], int, int]:
if '=' in get_range:
get_range = get_range.split('=')[1]
start, end = get_range.split('-')
Expand All @@ -126,7 +130,7 @@ def _prepare_range_response_headers(self, get_range: str) -> typing.Tuple[typing
'Content-Length': str(final_size),
'Content-Type': self.mime_type
}
return headers, final_size, start, end
return headers, start, end


class TorrentManager(SourceManager):
Expand Down

0 comments on commit 4fbe4e8

Please sign in to comment.