From 6af8e61f98d5fca019173464302a024d81806cc5 Mon Sep 17 00:00:00 2001 From: Tomer Asida Date: Mon, 25 Nov 2024 14:35:46 +0200 Subject: [PATCH 01/14] don't block GIL in tokenization (preprocess) in OpenAI compatible server by using threadpool for tokenization Signed-off-by: Tomer Asida --- vllm/commit_id.py | 1 + vllm/entrypoints/openai/serving_completion.py | 2 +- vllm/entrypoints/openai/serving_embedding.py | 2 +- vllm/entrypoints/openai/serving_engine.py | 50 +++++++++++++++++-- .../openai/serving_tokenization.py | 4 +- 5 files changed, 51 insertions(+), 8 deletions(-) create mode 100644 vllm/commit_id.py diff --git a/vllm/commit_id.py b/vllm/commit_id.py new file mode 100644 index 0000000000000..d7cc0000c4225 --- /dev/null +++ b/vllm/commit_id.py @@ -0,0 +1 @@ +__commit__ = "534e75c1193165605aee6d741deb4a1536c58ff1" diff --git a/vllm/entrypoints/openai/serving_completion.py b/vllm/entrypoints/openai/serving_completion.py index 936aae8f1c267..fc1c4908d6650 100644 --- a/vllm/entrypoints/openai/serving_completion.py +++ b/vllm/entrypoints/openai/serving_completion.py @@ -101,7 +101,7 @@ async def create_completion( tokenizer = await self.engine_client.get_tokenizer(lora_request) - request_prompts, engine_prompts = self._preprocess_completion( + request_prompts, engine_prompts = await self._preprocess_completion( request, tokenizer, request.prompt, diff --git a/vllm/entrypoints/openai/serving_embedding.py b/vllm/entrypoints/openai/serving_embedding.py index c84a7d2d8e13e..486edfa1ad323 100644 --- a/vllm/entrypoints/openai/serving_embedding.py +++ b/vllm/entrypoints/openai/serving_embedding.py @@ -156,7 +156,7 @@ async def create_embedding( add_special_tokens=request.add_special_tokens, ) else: - request_prompts, engine_prompts = self._preprocess_completion( + request_prompts, engine_prompts = await self._preprocess_completion( request, tokenizer, request.input, diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index cae2877ea7e99..471b2731c51ac 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -46,7 +46,7 @@ from vllm.tracing import (contains_trace_headers, extract_trace_headers, log_tracing_disabled_warning) from vllm.transformers_utils.tokenizer import AnyTokenizer, MistralTokenizer -from vllm.utils import AtomicCounter, is_list_of +from vllm.utils import AtomicCounter, is_list_of, make_async logger = init_logger(__name__) @@ -397,7 +397,49 @@ def _tokenize_prompt_input_or_inputs( truncate_prompt_tokens=truncate_prompt_tokens, ) - def _preprocess_completion( + async def _tokenize_prompt_input_async( + self, + request: AnyRequest, + tokenizer: AnyTokenizer, + prompt_input: Union[str, List[int]], + truncate_prompt_tokens: Optional[Annotated[int, Field(ge=1)]] = None, + add_special_tokens: bool = True, + ) -> TextTokensPrompt: + return await make_async(self._tokenize_prompt_input)(request=request, + tokenizer=tokenizer, + prompt_input=prompt_input, + truncate_prompt_tokens=truncate_prompt_tokens, + add_special_tokens=add_special_tokens) + + async def _tokenize_prompt_inputs_async( + self, + request: AnyRequest, + tokenizer: AnyTokenizer, + prompt_inputs: Iterable[Union[str, List[int]]], + truncate_prompt_tokens: Optional[Annotated[int, Field(ge=1)]] = None, + add_special_tokens: bool = True, + ) -> Iterator[TextTokensPrompt]: + return await make_async(self._tokenize_prompt_inputs)(request=request, + tokenizer=tokenizer, + prompt_inputs=prompt_inputs, + truncate_prompt_tokens=truncate_prompt_tokens, + add_special_tokens=add_special_tokens) + + async def _tokenize_prompt_input_or_inputs_async( + self, + request: AnyRequest, + tokenizer: AnyTokenizer, + input_or_inputs: Union[str, List[str], List[int], List[List[int]]], + truncate_prompt_tokens: Optional[Annotated[int, Field(ge=1)]] = None, + add_special_tokens: bool = True, + ) -> Iterator[TextTokensPrompt]: + return await make_async(self._tokenize_prompt_input_or_inputs)(request=request, + tokenizer=tokenizer, + input_or_inputs=input_or_inputs, + truncate_prompt_tokens=truncate_prompt_tokens, + add_special_tokens=add_special_tokens) + + async def _preprocess_completion( self, request: CompletionLikeRequest, tokenizer: AnyTokenizer, @@ -407,7 +449,7 @@ def _preprocess_completion( ) -> Tuple[Sequence[TextTokensPrompt], List[TokensPrompt]]: request_prompts = [ request_prompt - for request_prompt in self._tokenize_prompt_input_or_inputs( + for request_prompt in await self._tokenize_prompt_input_or_inputs_async( request, tokenizer, input_or_inputs, @@ -493,7 +535,7 @@ async def _preprocess_chat( request=request) if isinstance(request_prompt, str): - prompt_inputs = self._tokenize_prompt_input( + prompt_inputs = await self._tokenize_prompt_input_async( request, tokenizer, request_prompt, diff --git a/vllm/entrypoints/openai/serving_tokenization.py b/vllm/entrypoints/openai/serving_tokenization.py index 59b3b1311f881..b31ca0053d793 100644 --- a/vllm/entrypoints/openai/serving_tokenization.py +++ b/vllm/entrypoints/openai/serving_tokenization.py @@ -81,7 +81,7 @@ async def create_tokenize( add_special_tokens=request.add_special_tokens, ) else: - request_prompts, engine_prompts = self._preprocess_completion( + request_prompts, engine_prompts = await self._preprocess_completion( request, tokenizer, request.prompt, @@ -134,7 +134,7 @@ async def create_detokenize( # Silently ignore prompt adapter since it does not affect tokenization # (Unlike in Embeddings API where an error is raised) - prompt_input = self._tokenize_prompt_input( + prompt_input = await self._tokenize_prompt_input_async( request, tokenizer, request.tokens, From 821665bf3ff589b296f39a8733c8c8fa838255e5 Mon Sep 17 00:00:00 2001 From: Tomer Asida Date: Mon, 25 Nov 2024 14:51:13 +0200 Subject: [PATCH 02/14] format Signed-off-by: Tomer Asida --- vllm/entrypoints/openai/serving_embedding.py | 15 ++-- vllm/entrypoints/openai/serving_engine.py | 75 ++++++++++--------- .../openai/serving_tokenization.py | 13 ++-- 3 files changed, 54 insertions(+), 49 deletions(-) diff --git a/vllm/entrypoints/openai/serving_embedding.py b/vllm/entrypoints/openai/serving_embedding.py index 486edfa1ad323..78e2416d9d4da 100644 --- a/vllm/entrypoints/openai/serving_embedding.py +++ b/vllm/entrypoints/openai/serving_embedding.py @@ -156,13 +156,14 @@ async def create_embedding( add_special_tokens=request.add_special_tokens, ) else: - request_prompts, engine_prompts = await self._preprocess_completion( - request, - tokenizer, - request.input, - truncate_prompt_tokens=truncate_prompt_tokens, - add_special_tokens=request.add_special_tokens, - ) + (request_prompts, + engine_prompts) = await self._preprocess_completion( + request, + tokenizer, + request.input, + truncate_prompt_tokens=truncate_prompt_tokens, + add_special_tokens=request.add_special_tokens, + ) except ValueError as e: logger.exception("Error in preprocessing prompt inputs") return self.create_error_response(str(e)) diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index 471b2731c51ac..3881a1eea2095 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -398,46 +398,49 @@ def _tokenize_prompt_input_or_inputs( ) async def _tokenize_prompt_input_async( - self, - request: AnyRequest, - tokenizer: AnyTokenizer, - prompt_input: Union[str, List[int]], - truncate_prompt_tokens: Optional[Annotated[int, Field(ge=1)]] = None, - add_special_tokens: bool = True, - ) -> TextTokensPrompt: - return await make_async(self._tokenize_prompt_input)(request=request, - tokenizer=tokenizer, - prompt_input=prompt_input, - truncate_prompt_tokens=truncate_prompt_tokens, - add_special_tokens=add_special_tokens) + self, + request: AnyRequest, + tokenizer: AnyTokenizer, + prompt_input: Union[str, List[int]], + truncate_prompt_tokens: Optional[Annotated[int, Field(ge=1)]] = None, + add_special_tokens: bool = True, + ) -> TextTokensPrompt: + return await make_async(self._tokenize_prompt_input)( + request=request, + tokenizer=tokenizer, + prompt_input=prompt_input, + truncate_prompt_tokens=truncate_prompt_tokens, + add_special_tokens=add_special_tokens) async def _tokenize_prompt_inputs_async( - self, - request: AnyRequest, - tokenizer: AnyTokenizer, - prompt_inputs: Iterable[Union[str, List[int]]], - truncate_prompt_tokens: Optional[Annotated[int, Field(ge=1)]] = None, - add_special_tokens: bool = True, + self, + request: AnyRequest, + tokenizer: AnyTokenizer, + prompt_inputs: Iterable[Union[str, List[int]]], + truncate_prompt_tokens: Optional[Annotated[int, Field(ge=1)]] = None, + add_special_tokens: bool = True, ) -> Iterator[TextTokensPrompt]: - return await make_async(self._tokenize_prompt_inputs)(request=request, - tokenizer=tokenizer, - prompt_inputs=prompt_inputs, - truncate_prompt_tokens=truncate_prompt_tokens, - add_special_tokens=add_special_tokens) + return await make_async(self._tokenize_prompt_inputs)( + request=request, + tokenizer=tokenizer, + prompt_inputs=prompt_inputs, + truncate_prompt_tokens=truncate_prompt_tokens, + add_special_tokens=add_special_tokens) async def _tokenize_prompt_input_or_inputs_async( - self, - request: AnyRequest, - tokenizer: AnyTokenizer, - input_or_inputs: Union[str, List[str], List[int], List[List[int]]], - truncate_prompt_tokens: Optional[Annotated[int, Field(ge=1)]] = None, - add_special_tokens: bool = True, + self, + request: AnyRequest, + tokenizer: AnyTokenizer, + input_or_inputs: Union[str, List[str], List[int], List[List[int]]], + truncate_prompt_tokens: Optional[Annotated[int, Field(ge=1)]] = None, + add_special_tokens: bool = True, ) -> Iterator[TextTokensPrompt]: - return await make_async(self._tokenize_prompt_input_or_inputs)(request=request, - tokenizer=tokenizer, - input_or_inputs=input_or_inputs, - truncate_prompt_tokens=truncate_prompt_tokens, - add_special_tokens=add_special_tokens) + return await make_async(self._tokenize_prompt_input_or_inputs)( + request=request, + tokenizer=tokenizer, + input_or_inputs=input_or_inputs, + truncate_prompt_tokens=truncate_prompt_tokens, + add_special_tokens=add_special_tokens) async def _preprocess_completion( self, @@ -448,8 +451,8 @@ async def _preprocess_completion( add_special_tokens: bool = True, ) -> Tuple[Sequence[TextTokensPrompt], List[TokensPrompt]]: request_prompts = [ - request_prompt - for request_prompt in await self._tokenize_prompt_input_or_inputs_async( + request_prompt for request_prompt in await + self._tokenize_prompt_input_or_inputs_async( request, tokenizer, input_or_inputs, diff --git a/vllm/entrypoints/openai/serving_tokenization.py b/vllm/entrypoints/openai/serving_tokenization.py index b31ca0053d793..9c3dc2c98b2dd 100644 --- a/vllm/entrypoints/openai/serving_tokenization.py +++ b/vllm/entrypoints/openai/serving_tokenization.py @@ -81,12 +81,13 @@ async def create_tokenize( add_special_tokens=request.add_special_tokens, ) else: - request_prompts, engine_prompts = await self._preprocess_completion( - request, - tokenizer, - request.prompt, - add_special_tokens=request.add_special_tokens, - ) + (request_prompts, + engine_prompts) = await self._preprocess_completion( + request, + tokenizer, + request.prompt, + add_special_tokens=request.add_special_tokens, + ) except ValueError as e: logger.exception("Error in preprocessing prompt inputs") return self.create_error_response(str(e)) From 5f2164a58d0c4205b570e2370b22bf3b434b03da Mon Sep 17 00:00:00 2001 From: Tomer Asida Date: Mon, 25 Nov 2024 15:00:55 +0200 Subject: [PATCH 03/14] remove commit_id that was mistakenly added Signed-off-by: Tomer Asida --- vllm/commit_id.py | 1 - 1 file changed, 1 deletion(-) delete mode 100644 vllm/commit_id.py diff --git a/vllm/commit_id.py b/vllm/commit_id.py deleted file mode 100644 index d7cc0000c4225..0000000000000 --- a/vllm/commit_id.py +++ /dev/null @@ -1 +0,0 @@ -__commit__ = "534e75c1193165605aee6d741deb4a1536c58ff1" From dd01b53f97b9d9b146ff15940efbb6499989e2fd Mon Sep 17 00:00:00 2001 From: Tomer Asida Date: Mon, 25 Nov 2024 15:06:26 +0200 Subject: [PATCH 04/14] simpler - just assign methods in init Signed-off-by: Tomer Asida --- vllm/entrypoints/openai/serving_engine.py | 49 ++--------------------- 1 file changed, 4 insertions(+), 45 deletions(-) diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index 3881a1eea2095..1859d67f04f37 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -140,6 +140,10 @@ def __init__( self.request_logger = request_logger self.return_tokens_as_token_ids = return_tokens_as_token_ids + self._tokenize_prompt_input_async = make_async(self._tokenize_prompt_input) + self._tokenize_prompt_inputs_async = make_async(self._tokenize_prompt_inputs) + self._tokenize_prompt_input_or_inputs_async = make_async(self._tokenize_prompt_input_or_inputs) + async def show_available_models(self) -> ModelList: """Show available models. Right now we only have one model.""" model_cards = [ @@ -397,51 +401,6 @@ def _tokenize_prompt_input_or_inputs( truncate_prompt_tokens=truncate_prompt_tokens, ) - async def _tokenize_prompt_input_async( - self, - request: AnyRequest, - tokenizer: AnyTokenizer, - prompt_input: Union[str, List[int]], - truncate_prompt_tokens: Optional[Annotated[int, Field(ge=1)]] = None, - add_special_tokens: bool = True, - ) -> TextTokensPrompt: - return await make_async(self._tokenize_prompt_input)( - request=request, - tokenizer=tokenizer, - prompt_input=prompt_input, - truncate_prompt_tokens=truncate_prompt_tokens, - add_special_tokens=add_special_tokens) - - async def _tokenize_prompt_inputs_async( - self, - request: AnyRequest, - tokenizer: AnyTokenizer, - prompt_inputs: Iterable[Union[str, List[int]]], - truncate_prompt_tokens: Optional[Annotated[int, Field(ge=1)]] = None, - add_special_tokens: bool = True, - ) -> Iterator[TextTokensPrompt]: - return await make_async(self._tokenize_prompt_inputs)( - request=request, - tokenizer=tokenizer, - prompt_inputs=prompt_inputs, - truncate_prompt_tokens=truncate_prompt_tokens, - add_special_tokens=add_special_tokens) - - async def _tokenize_prompt_input_or_inputs_async( - self, - request: AnyRequest, - tokenizer: AnyTokenizer, - input_or_inputs: Union[str, List[str], List[int], List[List[int]]], - truncate_prompt_tokens: Optional[Annotated[int, Field(ge=1)]] = None, - add_special_tokens: bool = True, - ) -> Iterator[TextTokensPrompt]: - return await make_async(self._tokenize_prompt_input_or_inputs)( - request=request, - tokenizer=tokenizer, - input_or_inputs=input_or_inputs, - truncate_prompt_tokens=truncate_prompt_tokens, - add_special_tokens=add_special_tokens) - async def _preprocess_completion( self, request: CompletionLikeRequest, From 4a6efcb57ef729ec0cf8f2902819ef7782d5feba Mon Sep 17 00:00:00 2001 From: Tomer Asida Date: Mon, 25 Nov 2024 15:08:50 +0200 Subject: [PATCH 05/14] format Signed-off-by: Tomer Asida --- vllm/entrypoints/openai/serving_engine.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index 1859d67f04f37..5f58cd66fc8b7 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -140,9 +140,12 @@ def __init__( self.request_logger = request_logger self.return_tokens_as_token_ids = return_tokens_as_token_ids - self._tokenize_prompt_input_async = make_async(self._tokenize_prompt_input) - self._tokenize_prompt_inputs_async = make_async(self._tokenize_prompt_inputs) - self._tokenize_prompt_input_or_inputs_async = make_async(self._tokenize_prompt_input_or_inputs) + self._tokenize_prompt_input_async = make_async( + self._tokenize_prompt_input) + self._tokenize_prompt_inputs_async = make_async( + self._tokenize_prompt_inputs) + self._tokenize_prompt_input_or_inputs_async = make_async( + self._tokenize_prompt_input_or_inputs) async def show_available_models(self) -> ModelList: """Show available models. Right now we only have one model.""" From f89eaa03dac02d33d948dca0ed28021fe0dc89f0 Mon Sep 17 00:00:00 2001 From: Tomer Asida Date: Mon, 25 Nov 2024 15:21:51 +0200 Subject: [PATCH 06/14] async tokenization also in serving_score.py Signed-off-by: Tomer Asida --- vllm/entrypoints/openai/serving_score.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/vllm/entrypoints/openai/serving_score.py b/vllm/entrypoints/openai/serving_score.py index 156fea6f47982..845365cd0575d 100644 --- a/vllm/entrypoints/openai/serving_score.py +++ b/vllm/entrypoints/openai/serving_score.py @@ -15,7 +15,7 @@ from vllm.logger import init_logger from vllm.outputs import EmbeddingRequestOutput from vllm.transformers_utils.tokenizers.mistral import MistralTokenizer -from vllm.utils import merge_async_iterators, random_uuid +from vllm.utils import merge_async_iterators, random_uuid, make_async logger = init_logger(__name__) @@ -145,9 +145,10 @@ async def create_score( tokenization_kwargs["truncation"] = True tokenization_kwargs["max_length"] = truncate_prompt_tokens - prompt_inputs = tokenizer(text=q, - text_pair=t, - **tokenization_kwargs) + tokenize_async = make_async(tokenizer.__call__) + prompt_inputs = await tokenize_async(text=q, + text_pair=t, + **tokenization_kwargs) engine_prompt = TokensPrompt( prompt_token_ids=prompt_inputs["input_ids"], token_type_ids=prompt_inputs.get("token_type_ids")) From 980fff886f93e300a456b2ba05e6ffe95e52c0d2 Mon Sep 17 00:00:00 2001 From: Tomer Asida Date: Mon, 25 Nov 2024 15:22:58 +0200 Subject: [PATCH 07/14] format Signed-off-by: Tomer Asida --- vllm/entrypoints/openai/serving_score.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/entrypoints/openai/serving_score.py b/vllm/entrypoints/openai/serving_score.py index 845365cd0575d..bf54730a42023 100644 --- a/vllm/entrypoints/openai/serving_score.py +++ b/vllm/entrypoints/openai/serving_score.py @@ -15,7 +15,7 @@ from vllm.logger import init_logger from vllm.outputs import EmbeddingRequestOutput from vllm.transformers_utils.tokenizers.mistral import MistralTokenizer -from vllm.utils import merge_async_iterators, random_uuid, make_async +from vllm.utils import make_async, merge_async_iterators, random_uuid logger = init_logger(__name__) From da646c10cf2b16acac6788edfdebdf975fac01fc Mon Sep 17 00:00:00 2001 From: Tomer Asida Date: Mon, 25 Nov 2024 20:37:36 +0200 Subject: [PATCH 08/14] no need to make self._tokenize_prompt_inputs async as it's used only in self._tokenize_prompt_input Signed-off-by: Tomer Asida --- vllm/entrypoints/openai/serving_engine.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index 5f58cd66fc8b7..7d18057f3fc98 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -142,8 +142,6 @@ def __init__( self._tokenize_prompt_input_async = make_async( self._tokenize_prompt_input) - self._tokenize_prompt_inputs_async = make_async( - self._tokenize_prompt_inputs) self._tokenize_prompt_input_or_inputs_async = make_async( self._tokenize_prompt_input_or_inputs) From b61a04ff01c5f5990e0335de16d214620f09169e Mon Sep 17 00:00:00 2001 From: Tomer Asida Date: Mon, 25 Nov 2024 20:37:58 +0200 Subject: [PATCH 09/14] make self._tokenize_prompt_input_or_inputs return a list so make_async will actually make execution run in thread and not just generator creation Signed-off-by: Tomer Asida --- vllm/entrypoints/openai/serving_engine.py | 60 +++++++++++------------ 1 file changed, 28 insertions(+), 32 deletions(-) diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index 7d18057f3fc98..d3ec1361ac630 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -373,7 +373,7 @@ def _tokenize_prompt_input_or_inputs( input_or_inputs: Union[str, List[str], List[int], List[List[int]]], truncate_prompt_tokens: Optional[Annotated[int, Field(ge=1)]] = None, add_special_tokens: bool = True, - ) -> Iterator[TextTokensPrompt]: + ) -> List[TextTokensPrompt]: """ Tokenize/detokenize depending on the input format. @@ -381,26 +381,25 @@ def _tokenize_prompt_input_or_inputs( , each input can be a string or array of tokens. Note that each request can pass one or more inputs. """ - for prompt_input in parse_and_batch_prompt(input_or_inputs): - # Although our type checking is based on mypy, - # VSCode Pyright extension should still work properly - # "is True" is required for Pyright to perform type narrowing - # See: https://github.com/microsoft/pyright/issues/7672 - if prompt_input["is_tokens"] is False: - yield self._normalize_prompt_text_to_input( - request, - tokenizer, - prompt=prompt_input["content"], - truncate_prompt_tokens=truncate_prompt_tokens, - add_special_tokens=add_special_tokens, - ) - else: - yield self._normalize_prompt_tokens_to_input( - request, - tokenizer, - prompt_ids=prompt_input["content"], - truncate_prompt_tokens=truncate_prompt_tokens, - ) + # Although our type checking is based on mypy, + # VSCode Pyright extension should still work properly + # "is True" is required for Pyright to perform type narrowing + # See: https://github.com/microsoft/pyright/issues/7672 + return [ + self._normalize_prompt_text_to_input( + request, + tokenizer, + prompt=prompt_input["content"], + truncate_prompt_tokens=truncate_prompt_tokens, + add_special_tokens=add_special_tokens) + if prompt_input["is_tokens"] is False else + self._normalize_prompt_tokens_to_input( + request, + tokenizer, + prompt_ids=prompt_input["content"], + truncate_prompt_tokens=truncate_prompt_tokens) + for prompt_input in parse_and_batch_prompt(input_or_inputs) + ] async def _preprocess_completion( self, @@ -409,17 +408,14 @@ async def _preprocess_completion( input_or_inputs: Union[str, List[str], List[int], List[List[int]]], truncate_prompt_tokens: Optional[Annotated[int, Field(ge=1)]] = None, add_special_tokens: bool = True, - ) -> Tuple[Sequence[TextTokensPrompt], List[TokensPrompt]]: - request_prompts = [ - request_prompt for request_prompt in await - self._tokenize_prompt_input_or_inputs_async( - request, - tokenizer, - input_or_inputs, - truncate_prompt_tokens=truncate_prompt_tokens, - add_special_tokens=add_special_tokens, - ) - ] + ) -> Tuple[List[TextTokensPrompt], List[TokensPrompt]]: + request_prompts = await self._tokenize_prompt_input_or_inputs_async( + request, + tokenizer, + input_or_inputs, + truncate_prompt_tokens=truncate_prompt_tokens, + add_special_tokens=add_special_tokens, + ) engine_prompts = [ TokensPrompt(prompt_token_ids=request_prompt["prompt_token_ids"]) From e4cb992357a6ff4f882e0e6324b3e62c49570a58 Mon Sep 17 00:00:00 2001 From: Tomer Asida Date: Tue, 26 Nov 2024 16:16:57 +0200 Subject: [PATCH 10/14] introduce threadsafe tokenizer and use in MQLLMEngineClient Signed-off-by: Tomer Asida --- vllm/engine/multiprocessing/client.py | 1 + vllm/transformers_utils/tokenizer.py | 27 +++++++++++++++++++ .../tokenizer_group/__init__.py | 6 +++-- 3 files changed, 32 insertions(+), 2 deletions(-) diff --git a/vllm/engine/multiprocessing/client.py b/vllm/engine/multiprocessing/client.py index fe21c58c775fe..718973d22a228 100644 --- a/vllm/engine/multiprocessing/client.py +++ b/vllm/engine/multiprocessing/client.py @@ -94,6 +94,7 @@ def __init__(self, ipc_path: str, engine_config: VllmConfig, scheduler_config=engine_config.scheduler_config, parallel_config=engine_config.parallel_config, enable_lora=bool(engine_config.lora_config), + use_threadsafe=True, ) self.input_preprocessor = InputPreprocessor(self.model_config, self.tokenizer) diff --git a/vllm/transformers_utils/tokenizer.py b/vllm/transformers_utils/tokenizer.py index 54f9f895fe541..85d67b1982407 100644 --- a/vllm/transformers_utils/tokenizer.py +++ b/vllm/transformers_utils/tokenizer.py @@ -1,4 +1,5 @@ import os +import threading import warnings from pathlib import Path from types import MethodType @@ -21,6 +22,28 @@ MistralTokenizer] +def get_threadsafe_tokenizer(tokenizer: AnyTokenizer) -> AnyTokenizer: + """Get a threadsafe tokenizer. + + This will patch the tokenizer object in place. + + This is necessary because transformers tokenizers are not threadsafe + by default, and can lead to crashes when used in a multithreaded.""" + + lock = threading.Lock() + + class ThreadsafeTokenizer(tokenizer.__class__): # type: ignore + def __call__(self, *args, **kwargs): + with lock: + logger.info(f"using lock in thread {threading.get_ident()}") + return super().__call__(*args, **kwargs) + + ThreadsafeTokenizer.__name__ = f"Threadsafe{tokenizer.__class__.__name__}" + + tokenizer.__class__ = ThreadsafeTokenizer + return tokenizer + + def get_cached_tokenizer(tokenizer: AnyTokenizer) -> AnyTokenizer: """Get tokenizer with cached properties. @@ -91,6 +114,7 @@ def get_tokenizer( trust_remote_code: bool = False, revision: Optional[str] = None, download_dir: Optional[str] = None, + use_threadsafe: bool = False, **kwargs, ) -> AnyTokenizer: """Gets a tokenizer for the given model name via HuggingFace or ModelScope. @@ -176,6 +200,9 @@ def get_tokenizer( "slowdown. Consider using a fast tokenizer instead.") tokenizer = get_cached_tokenizer(tokenizer) + if use_threadsafe: + tokenizer = get_threadsafe_tokenizer(tokenizer) + return tokenizer diff --git a/vllm/transformers_utils/tokenizer_group/__init__.py b/vllm/transformers_utils/tokenizer_group/__init__.py index 6a114b513f382..d1e52ceebda46 100644 --- a/vllm/transformers_utils/tokenizer_group/__init__.py +++ b/vllm/transformers_utils/tokenizer_group/__init__.py @@ -16,14 +16,16 @@ def init_tokenizer_from_configs(model_config: ModelConfig, scheduler_config: SchedulerConfig, parallel_config: ParallelConfig, - enable_lora: bool): + enable_lora: bool, + use_threadsafe: bool = False): init_kwargs = dict(tokenizer_id=model_config.tokenizer, enable_lora=enable_lora, max_num_seqs=scheduler_config.max_num_seqs, max_input_length=None, tokenizer_mode=model_config.tokenizer_mode, trust_remote_code=model_config.trust_remote_code, - revision=model_config.tokenizer_revision) + revision=model_config.tokenizer_revision, + use_threadsafe=use_threadsafe) if (model_config.encoder_config is not None and "do_lower_case" in model_config.encoder_config): From e59cc81500250c37987f205398c6945b03010988 Mon Sep 17 00:00:00 2001 From: Tomer Asida Date: Tue, 26 Nov 2024 16:39:29 +0200 Subject: [PATCH 11/14] format Signed-off-by: Tomer Asida --- vllm/transformers_utils/tokenizer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vllm/transformers_utils/tokenizer.py b/vllm/transformers_utils/tokenizer.py index 85d67b1982407..629d80b820e85 100644 --- a/vllm/transformers_utils/tokenizer.py +++ b/vllm/transformers_utils/tokenizer.py @@ -33,9 +33,9 @@ def get_threadsafe_tokenizer(tokenizer: AnyTokenizer) -> AnyTokenizer: lock = threading.Lock() class ThreadsafeTokenizer(tokenizer.__class__): # type: ignore + def __call__(self, *args, **kwargs): with lock: - logger.info(f"using lock in thread {threading.get_ident()}") return super().__call__(*args, **kwargs) ThreadsafeTokenizer.__name__ = f"Threadsafe{tokenizer.__class__.__name__}" From f0c0a2fdbd7657a3e323d5d364019ea4de873bc5 Mon Sep 17 00:00:00 2001 From: Tomer Asida Date: Tue, 26 Nov 2024 23:02:52 +0200 Subject: [PATCH 12/14] Use ThreadPoolExecutor with max_workers=1 to make tokenization async. No need for threadsafe tokenizer anymore since all tokenization happens on the same thread Signed-off-by: Tomer Asida --- vllm/engine/multiprocessing/client.py | 1 - vllm/entrypoints/openai/serving_engine.py | 8 ++++-- vllm/entrypoints/openai/serving_score.py | 3 ++- vllm/transformers_utils/tokenizer.py | 27 ------------------- .../tokenizer_group/__init__.py | 6 ++--- vllm/utils.py | 8 ++++-- 6 files changed, 16 insertions(+), 37 deletions(-) diff --git a/vllm/engine/multiprocessing/client.py b/vllm/engine/multiprocessing/client.py index 718973d22a228..fe21c58c775fe 100644 --- a/vllm/engine/multiprocessing/client.py +++ b/vllm/engine/multiprocessing/client.py @@ -94,7 +94,6 @@ def __init__(self, ipc_path: str, engine_config: VllmConfig, scheduler_config=engine_config.scheduler_config, parallel_config=engine_config.parallel_config, enable_lora=bool(engine_config.lora_config), - use_threadsafe=True, ) self.input_preprocessor = InputPreprocessor(self.model_config, self.tokenizer) diff --git a/vllm/entrypoints/openai/serving_engine.py b/vllm/entrypoints/openai/serving_engine.py index d3ec1361ac630..8232c6116c1bd 100644 --- a/vllm/entrypoints/openai/serving_engine.py +++ b/vllm/entrypoints/openai/serving_engine.py @@ -1,5 +1,6 @@ import json import pathlib +from concurrent.futures.thread import ThreadPoolExecutor from dataclasses import dataclass from http import HTTPStatus from typing import (Any, Callable, Dict, Iterable, Iterator, List, Mapping, @@ -140,10 +141,13 @@ def __init__( self.request_logger = request_logger self.return_tokens_as_token_ids = return_tokens_as_token_ids + self._tokenizer_executor = ThreadPoolExecutor(max_workers=1) + self._tokenize_prompt_input_async = make_async( - self._tokenize_prompt_input) + self._tokenize_prompt_input, executor=self._tokenizer_executor) self._tokenize_prompt_input_or_inputs_async = make_async( - self._tokenize_prompt_input_or_inputs) + self._tokenize_prompt_input_or_inputs, + executor=self._tokenizer_executor) async def show_available_models(self) -> ModelList: """Show available models. Right now we only have one model.""" diff --git a/vllm/entrypoints/openai/serving_score.py b/vllm/entrypoints/openai/serving_score.py index bf54730a42023..7cd8ff08b5608 100644 --- a/vllm/entrypoints/openai/serving_score.py +++ b/vllm/entrypoints/openai/serving_score.py @@ -145,7 +145,8 @@ async def create_score( tokenization_kwargs["truncation"] = True tokenization_kwargs["max_length"] = truncate_prompt_tokens - tokenize_async = make_async(tokenizer.__call__) + tokenize_async = make_async(tokenizer.__call__, + executor=self._tokenizer_executor) prompt_inputs = await tokenize_async(text=q, text_pair=t, **tokenization_kwargs) diff --git a/vllm/transformers_utils/tokenizer.py b/vllm/transformers_utils/tokenizer.py index 629d80b820e85..54f9f895fe541 100644 --- a/vllm/transformers_utils/tokenizer.py +++ b/vllm/transformers_utils/tokenizer.py @@ -1,5 +1,4 @@ import os -import threading import warnings from pathlib import Path from types import MethodType @@ -22,28 +21,6 @@ MistralTokenizer] -def get_threadsafe_tokenizer(tokenizer: AnyTokenizer) -> AnyTokenizer: - """Get a threadsafe tokenizer. - - This will patch the tokenizer object in place. - - This is necessary because transformers tokenizers are not threadsafe - by default, and can lead to crashes when used in a multithreaded.""" - - lock = threading.Lock() - - class ThreadsafeTokenizer(tokenizer.__class__): # type: ignore - - def __call__(self, *args, **kwargs): - with lock: - return super().__call__(*args, **kwargs) - - ThreadsafeTokenizer.__name__ = f"Threadsafe{tokenizer.__class__.__name__}" - - tokenizer.__class__ = ThreadsafeTokenizer - return tokenizer - - def get_cached_tokenizer(tokenizer: AnyTokenizer) -> AnyTokenizer: """Get tokenizer with cached properties. @@ -114,7 +91,6 @@ def get_tokenizer( trust_remote_code: bool = False, revision: Optional[str] = None, download_dir: Optional[str] = None, - use_threadsafe: bool = False, **kwargs, ) -> AnyTokenizer: """Gets a tokenizer for the given model name via HuggingFace or ModelScope. @@ -200,9 +176,6 @@ def get_tokenizer( "slowdown. Consider using a fast tokenizer instead.") tokenizer = get_cached_tokenizer(tokenizer) - if use_threadsafe: - tokenizer = get_threadsafe_tokenizer(tokenizer) - return tokenizer diff --git a/vllm/transformers_utils/tokenizer_group/__init__.py b/vllm/transformers_utils/tokenizer_group/__init__.py index d1e52ceebda46..6a114b513f382 100644 --- a/vllm/transformers_utils/tokenizer_group/__init__.py +++ b/vllm/transformers_utils/tokenizer_group/__init__.py @@ -16,16 +16,14 @@ def init_tokenizer_from_configs(model_config: ModelConfig, scheduler_config: SchedulerConfig, parallel_config: ParallelConfig, - enable_lora: bool, - use_threadsafe: bool = False): + enable_lora: bool): init_kwargs = dict(tokenizer_id=model_config.tokenizer, enable_lora=enable_lora, max_num_seqs=scheduler_config.max_num_seqs, max_input_length=None, tokenizer_mode=model_config.tokenizer_mode, trust_remote_code=model_config.trust_remote_code, - revision=model_config.tokenizer_revision, - use_threadsafe=use_threadsafe) + revision=model_config.tokenizer_revision) if (model_config.encoder_config is not None and "do_lower_case" in model_config.encoder_config): diff --git a/vllm/utils.py b/vllm/utils.py index dd4283e3ac381..fbcf1323c79e4 100644 --- a/vllm/utils.py +++ b/vllm/utils.py @@ -1,5 +1,6 @@ import argparse import asyncio +import concurrent import contextlib import datetime import enum @@ -351,7 +352,10 @@ def in_wsl() -> bool: return "microsoft" in " ".join(uname()).lower() -def make_async(func: Callable[P, T]) -> Callable[P, Awaitable[T]]: +def make_async( + func: Callable[P, T], + executor: Optional[concurrent.futures.Executor] = None +) -> Callable[P, Awaitable[T]]: """Take a blocking function, and run it on in an executor thread. This function prevents the blocking function from blocking the @@ -362,7 +366,7 @@ def make_async(func: Callable[P, T]) -> Callable[P, Awaitable[T]]: def _async_wrapper(*args: P.args, **kwargs: P.kwargs) -> asyncio.Future: loop = asyncio.get_event_loop() p_func = partial(func, *args, **kwargs) - return loop.run_in_executor(executor=None, func=p_func) + return loop.run_in_executor(executor=executor, func=p_func) return _async_wrapper From b35a0630fa4213a1638d03bf6bdacf3628104ae0 Mon Sep 17 00:00:00 2001 From: Tomer Asida Date: Wed, 27 Nov 2024 13:49:50 +0200 Subject: [PATCH 13/14] Add tests to validate that (1) truncated and non-truncated requests can be sent concurrently and (2) that /health response time is short under high tokenization load Signed-off-by: Tomer Asida --- .../openai/test_async_tokenization.py | 137 ++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 tests/entrypoints/openai/test_async_tokenization.py diff --git a/tests/entrypoints/openai/test_async_tokenization.py b/tests/entrypoints/openai/test_async_tokenization.py new file mode 100644 index 0000000000000..c52f7dd658f31 --- /dev/null +++ b/tests/entrypoints/openai/test_async_tokenization.py @@ -0,0 +1,137 @@ +import asyncio +import contextlib +import random +import time +from typing import Callable + +import openai +import pytest +import pytest_asyncio +import requests + +from tests.utils import RemoteOpenAIServer + +MODEL_NAME = "Qwen/Qwen2.5-1.5B-Instruct" + + +@pytest.fixture(scope="module") +def server(): # noqa: F811 + args = [ + # use half precision for speed and memory savings in CI environment + "--dtype", + "bfloat16", + "--max-model-len", + "8192", + "--enforce-eager", + "--max-num-seqs", + "128", + "--load-format", + "dummy", + ] + + with RemoteOpenAIServer(MODEL_NAME, args) as remote_server: + yield remote_server + + +@pytest_asyncio.fixture +async def client(server): + async with server.get_async_client() as async_client: + yield async_client + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ids=["completion", "chat"], + argnames=["create_func_gen", "content_body"], + argvalues=[ + (lambda x: x.completions.create, { + "prompt": " ".join(['A'] * 10_000) + }), + (lambda x: x.chat.completions.create, { + "messages": [{ + "role": "user", + "content": " ".join(['A'] * 10_000) + }] + }), + ], +) +async def test_with_and_without_truncate( + server: RemoteOpenAIServer, + client: openai.AsyncOpenAI, + create_func_gen: Callable, + content_body: dict, +): + create_func = create_func_gen(client) + body = {"model": MODEL_NAME, **content_body, "max_tokens": 10} + + num_requests = 10 + truncate_prompt_tokens = ([1000] * (num_requests // 2) + [None] * + (num_requests - num_requests // 2)) + random.shuffle(truncate_prompt_tokens) + + bodies = [{ + **body, "extra_body": { + 'truncate_prompt_tokens': t + } + } for t in truncate_prompt_tokens] + + async def get_status_code(**kwargs): + try: + await create_func(**kwargs) + return 200 + except openai.APIStatusError as e: + return e.status_code + + responses = await asyncio.gather(*[get_status_code(**b) for b in bodies]) + assert 500 not in responses + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + ids=["single completion", "multiple completions", "chat"], + argnames=["create_func_gen", "content_body"], + argvalues=[ + (lambda x: x.completions.create, { + "prompt": " ".join(['A'] * 300_000) + }), + (lambda x: x.completions.create, { + "prompt": [" ".join(['A'] * 300_000)] * 2 + }), + (lambda x: x.chat.completions.create, { + "messages": [{ + "role": "user", + "content": " ".join(['A'] * 300_000) + }] + }), + ], +) +async def test_healthcheck_response_time( + server: RemoteOpenAIServer, + client: openai.AsyncOpenAI, + create_func_gen: Callable, + content_body: dict, +): + num_requests = 50 + + create_func = create_func_gen(client) + body = {"model": MODEL_NAME, **content_body, "max_tokens": 10} + + def get_response_time(url): + start_time = time.monotonic() + res = requests.get(url) + end_time = time.monotonic() + assert res.status_code == 200 + return end_time - start_time + + no_load_response_time = get_response_time(server.url_for("health")) + tasks = [ + asyncio.create_task(create_func(**body)) for _ in range(num_requests) + ] + await asyncio.sleep(1) + load_response_time = get_response_time(server.url_for("health")) + + with contextlib.suppress(openai.APIStatusError): + await asyncio.gather(*tasks) + + assert load_response_time < 100 * no_load_response_time + assert load_response_time < 0.1 From ff1d6a956e0d09b327480a984685f118a4f3ef87 Mon Sep 17 00:00:00 2001 From: Tomer Asida Date: Wed, 27 Nov 2024 14:05:59 +0200 Subject: [PATCH 14/14] add comment Signed-off-by: Tomer Asida --- tests/entrypoints/openai/test_async_tokenization.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/entrypoints/openai/test_async_tokenization.py b/tests/entrypoints/openai/test_async_tokenization.py index c52f7dd658f31..fcce8b46c4344 100644 --- a/tests/entrypoints/openai/test_async_tokenization.py +++ b/tests/entrypoints/openai/test_async_tokenization.py @@ -127,7 +127,7 @@ def get_response_time(url): tasks = [ asyncio.create_task(create_func(**body)) for _ in range(num_requests) ] - await asyncio.sleep(1) + await asyncio.sleep(1) # give the tasks a chance to start running load_response_time = get_response_time(server.url_for("health")) with contextlib.suppress(openai.APIStatusError):