Skip to content

Commit

Permalink
Merge pull request #953 from vespa-engine/thomasht86/fix-asyncclient-…
Browse files Browse the repository at this point in the history
…timeout-param

(bugfix) Fix asyncclient timeout param
  • Loading branch information
thomasht86 authored Oct 17, 2024
2 parents 244d5e6 + 80de57d commit ffaf0b7
Show file tree
Hide file tree
Showing 8 changed files with 12,977 additions and 12,761 deletions.
12,336 changes: 6,182 additions & 6,154 deletions docs/sphinx/source/examples/colpali-benchmark-vqa-vlm_Vespa-cloud.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/sphinx/source/examples/feed_performance.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@
" semaphore = asyncio.Semaphore(params.num_concurrent_requests)\n",
"\n",
" async with app.asyncio(\n",
" connections=params.max_connections, total_timeout=params.num_docs // 10\n",
" connections=params.max_connections, timeout=params.num_docs // 10\n",
" ) as async_app:\n",
" for doc in data:\n",
" async with semaphore:\n",
Expand Down
12,918 changes: 6,459 additions & 6,459 deletions docs/sphinx/source/examples/pdf-retrieval-with-ColQwen2-vlm_Vespa-cloud.ipynb

Large diffs are not rendered by default.

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ unittest = [
"pytest",
"requests-mock",
"vespacli",
"pytest-asyncio",
]
docs = [
"sphinx",
Expand Down
63 changes: 63 additions & 0 deletions tests/unit/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import gzip
from vespa.application import (
CustomHTTPAdapter,
VespaAsync,
)
import httpx


class TestVespaRequestsUsage(unittest.TestCase):
Expand Down Expand Up @@ -693,5 +695,66 @@ def test_retry_on_429_status(self):
self.assertEqual(mock_backoff.call_count, mock_send.call_count)


class MockVespa:
def __init__(
self,
base_headers=None,
auth_method=None,
vespa_cloud_secret_token=None,
cert=None,
key=None,
):
self.base_headers = base_headers or {}
self.auth_method = auth_method
self.vespa_cloud_secret_token = vespa_cloud_secret_token
self.cert = cert
self.key = key


# Test class
class TestVespaAsync:
def test_init_default(self):
app = MockVespa()
vespa_async = VespaAsync(app)
assert vespa_async.app == app
assert vespa_async.httpx_client is None
assert vespa_async.connections == 1
assert vespa_async.total_timeout is None
assert vespa_async.timeout == httpx.Timeout(5)
assert vespa_async.kwargs == {}
assert vespa_async.headers == app.base_headers
assert vespa_async.limits == httpx.Limits(max_keepalive_connections=1)

def test_init_total_timeout_warns(self):
app = MockVespa()
with pytest.warns(DeprecationWarning, match="total_timeout is deprecated"):
vespa_async = VespaAsync(app, total_timeout=10)
assert vespa_async.total_timeout == 10

def test_init_timeout_int(self):
app = MockVespa()
vespa_async = VespaAsync(app, timeout=10)
assert vespa_async.timeout == httpx.Timeout(10)

def test_init_timeout_timeout(self):
app = MockVespa()
timeout = httpx.Timeout(connect=5, read=10, write=15, pool=20)
vespa_async = VespaAsync(app, timeout=timeout)
assert vespa_async.timeout == timeout

def test_init_keepalive_expiry_warning(self):
app = MockVespa()
limits = httpx.Limits(keepalive_expiry=31)
with pytest.warns(
UserWarning, match="Keepalive expiry is set to more than 30 seconds"
):
_vespa_async = VespaAsync(app, limits=limits)

def test_init_no_keepalive_expiry_warning(self):
app = MockVespa()
limits = httpx.Limits(keepalive_expiry=1)
_vespa_async = VespaAsync(app, limits=limits)


if __name__ == "__main__":
unittest.main()
15 changes: 15 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

121 changes: 96 additions & 25 deletions vespa/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio
import traceback
import concurrent.futures
import warnings
from typing import Optional, Dict, Generator, List, IO, Iterable, Callable, Tuple, Union
from concurrent.futures import ThreadPoolExecutor, Future, as_completed
from queue import Queue, Empty
Expand Down Expand Up @@ -134,7 +135,11 @@ def __init__(
self.auth_method = "mtls"

def asyncio(
self, connections: Optional[int] = 8, total_timeout: int = 10
self,
connections: Optional[int] = 1,
total_timeout: Optional[int] = None,
timeout: Union[httpx.Timeout, int] = httpx.Timeout(5),
**kwargs,
) -> "VespaAsync":
"""
Access Vespa asynchronous connection layer.
Expand All @@ -145,14 +150,26 @@ def asyncio(
async with app.asyncio() as async_app:
response = await async_app.query(body=body)
See :class:`VespaAsync` for more details.
# passing kwargs
limits = httpx.Limits(max_keepalive_connections=5, max_connections=5, keepalive_expiry=15)
timeout = httpx.Timeout(connect=3, read=4, write=2, pool=5)
async with app.asyncio(connections=5, timeout=timeout, limits=limits) as async_app:
response = await async_app.query(body=body)
:param connections: Number of allowed concurrent connections
:param total_timeout: Total timeout in secs.
See :class:`VespaAsync` for more details on the parameters.
:param connections: Number of maximum_keepalive_connections.
:param total_timeout: Deprecated. Will be ignored. Use timeout instead.
:param timeout: httpx.Timeout object. See https://www.python-httpx.org/advanced/timeouts/. Defaults to 5 seconds.
:param kwargs: Additional arguments to be passed to the httpx.AsyncClient.
:return: Instance of Vespa asynchronous layer.
"""
return VespaAsync(
app=self, connections=connections, total_timeout=total_timeout
app=self,
connections=connections,
total_timeout=total_timeout,
timeout=timeout,
**kwargs,
)

def syncio(
Expand Down Expand Up @@ -1468,36 +1485,95 @@ def update_data(

class VespaAsync(object):
def __init__(
self, app: Vespa, connections: Optional[int] = 1, total_timeout: int = 10
self,
app: Vespa,
connections: Optional[int] = 1,
total_timeout: Optional[int] = None,
timeout: Union[httpx.Timeout, int] = httpx.Timeout(5),
**kwargs,
) -> None:
"""
Class to handle async HTTP-connection(s) to Vespa.
Uses httpx as the async http client, and HTTP/2 by default.
Class to handle asynchronous HTTP connections to Vespa.
Uses `httpx` as the async HTTP client, and HTTP/2 by default.
This class is intended to be used as a context manager.
Example usage::
**Basic usage**::
async with VespaAsync(app) as async_app:
response = await async_app.query(body={"yql": "select * from sources * where title contains 'music';"})
async with VespaAsync(app) as async_app:
response = await async_app.query(
body={"yql": "select * from sources * where title contains 'music';"}
)
Can also be accessed directly from :func:`Vespa.asyncio` ::
**Passing custom timeout and limits**::
app = Vespa(url="localhost", port=8080)
async with app.asyncio() as async_app:
response = await async_app.query(body={"yql": "select * from sources * where title contains 'music';"})
import httpx
timeout = httpx.Timeout(10.0, connect=5.0)
limits = httpx.Limits(max_connections=10, max_keepalive_connections=5)
async with VespaAsync(app, timeout=timeout, limits=limits) as async_app:
response = await async_app.query(
body={"yql": "select * from sources * where title contains 'music';"}
)
**Using additional kwargs (e.g., proxies)**::
proxies = "http://localhost:8080"
async with VespaAsync(app, proxies=proxies) as async_app:
response = await async_app.query(
body={"yql": "select * from sources * where title contains 'music';"}
)
**Accessing via :func:`Vespa.asyncio`**::
app = Vespa(url="localhost", port=8080)
async with app.asyncio(timeout=timeout, limits=limits) as async_app:
response = await async_app.query(
body={"yql": "select * from sources * where title contains 'music';"}
)
See also :func:`Vespa.feed_async_iterable` for a convenient interface to async data feeding.
Args:
app (Vespa): Vespa application object.
connections (Optional[int], optional): number of connections. Defaults to 1 as HTTP/2 is multiplexed.
total_timeout (int, optional): timeout for each individual request in seconds. Defaults to 10.
connections (Optional[int], optional): Number of connections. Defaults to 1 as HTTP/2 is multiplexed.
total_timeout (int, optional): **Deprecated**. Will be ignored and removed in future versions.
Use `timeout` to pass an `httpx.Timeout` object instead.
timeout (httpx.Timeout, optional): Timeout settings for the `httpx.AsyncClient`. Defaults to `httpx.Timeout(5)`.
**kwargs: Additional arguments to be passed to the `httpx.AsyncClient`. See
[HTTPX AsyncClient documentation](https://www.python-httpx.org/api/#asyncclient) for more details.
Note:
- Passing `timeout` allows you to configure timeouts for connect, read, write, and overall request time.
- The `limits` parameter can be used to control connection pooling behavior, such as the maximum number of concurrent connections.
- See https://www.python-httpx.org/ for more information on `httpx` and its features.
"""
self.app = app
self.httpx_client = None
self.connections = connections
self.total_timeout = total_timeout
if self.total_timeout is not None:
# issue DeprecationWarning
warnings.warn(
"total_timeout is deprecated, will be ignored and will be removed in future versions. Use timeout to pass a httpx.Timeout object instead.",
category=DeprecationWarning,
)
self.timeout = timeout
if isinstance(self.timeout, int):
self.timeout = httpx.Timeout(timeout)
self.kwargs = kwargs
self.headers = self.app.base_headers.copy()
self.limits = kwargs.get(
"limits", httpx.Limits(max_keepalive_connections=self.connections)
)
# Warn if limits.keepalive_expiry is higher than 30 seconds
if self.limits.keepalive_expiry and self.limits.keepalive_expiry > 30:
warnings.warn(
"Keepalive expiry is set to more than 30 seconds. Vespa server resets idle connections, so this may cause ConnectionResetError.",
category=UserWarning,
)
if self.app.auth_method == "token" and self.app.vespa_cloud_secret_token:
# Bearer and user-agent
self.headers.update(
Expand All @@ -1514,23 +1590,18 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
def _open_httpx_client(self):
if self.httpx_client is not None:
return
limits = httpx.Limits(
max_keepalive_connections=self.connections,
max_connections=self.connections,
keepalive_expiry=10, # This should NOT exceed the keepalive_timeout on the Server, otherwise we will get ConnectionTerminated errors.
)
timeout = httpx.Timeout(pool=5, connect=5, read=5, write=5)

if self.app.cert is not None:
sslcontext = httpx.create_ssl_context(cert=(self.app.cert, self.app.key))
else:
sslcontext = False
self.httpx_client = httpx.AsyncClient(
timeout=timeout,
timeout=self.timeout,
headers=self.headers,
verify=sslcontext,
http2=True, # HTTP/2 by default
http1=False,
limits=limits,
**self.kwargs,
)
return self.httpx_client

Expand Down

0 comments on commit ffaf0b7

Please sign in to comment.