Skip to content

Commit

Permalink
Merge pull request #536 from vespa-engine/kkraune/feed-util
Browse files Browse the repository at this point in the history
Add utility to create a vespa feed
  • Loading branch information
lesters authored Aug 10, 2023
2 parents 0de590f + 303c374 commit 0001754
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
23 changes: 20 additions & 3 deletions vespa/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import aiohttp
import asyncio
import concurrent.futures
import json
from collections import Counter
from typing import Any, Optional, Dict, List, IO

Expand All @@ -30,13 +31,14 @@
)


def parse_feed_df(df: DataFrame, include_id: bool, id_field="id") -> List[Dict[str, Any]]:
def parse_feed_df(df: DataFrame, include_id: bool, id_field="id", id_prefix="") -> List[Dict[str, Any]]:
"""
Convert a df into batch format for feeding
:param df: DataFrame with the following required columns ["id"]. Additional columns are assumed to be fields.
:param include_id: Include id on the fields to be fed.
:param id_field: Name of the column containing the id field.
:param id_prefix: Add a string prefix to ID field, e.g. "id:namespace:schema::"
:return: List of Dict containing 'id' and 'fields'.
"""
required_columns = [id_field]
Expand All @@ -46,7 +48,7 @@ def parse_feed_df(df: DataFrame, include_id: bool, id_field="id") -> List[Dict[s
records = df.to_dict(orient="records")
batch = [
{
"id": record[id_field],
"id": record[id_field] if id_prefix == "" else id_prefix + str(record[id_field]),
"fields": record
if include_id
else {k: v for k, v in record.items() if k not in [id_field]},
Expand All @@ -56,6 +58,21 @@ def parse_feed_df(df: DataFrame, include_id: bool, id_field="id") -> List[Dict[s
return batch


def df_to_vespafeed(df: DataFrame, schema_name: str, id_field="id", namespace="") -> str:
"""
Convert a df into a string in Vespa JSON feed format,
see https://docs.vespa.ai/en/reference/document-json-format.html
:param df: DataFrame with the following required columns ["id"]. Additional columns are assumed to be fields.
:param schema_name: Schema name
:param id_field: Name of the column containing the id field.
:param namespace: Set if namespace != schema_name
:return: JSON string in Vespa feed format
"""
return json.dumps(parse_feed_df(df, True, id_field,
"id:{}:{}::".format(schema_name if namespace == "" else namespace, schema_name)))


def raise_for_status(response: Response) -> None:
"""
Raises an appropriate error if necessary.
Expand Down Expand Up @@ -418,7 +435,7 @@ def feed_batch(
:return: List of HTTP POST responses
"""
mini_batches = [
batch[i : i + batch_size] for i in range(0, len(batch), batch_size)
batch[i: i + batch_size] for i in range(0, len(batch), batch_size)
]
batch_http_responses = []
for idx, mini_batch in enumerate(mini_batches):
Expand Down
18 changes: 17 additions & 1 deletion vespa/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,32 @@

import json
import unittest

import pandas
import pytest
from unittest.mock import PropertyMock, patch
from pandas import DataFrame
from requests.models import HTTPError, Response

from vespa.package import ApplicationPackage, Schema, Document
from vespa.application import Vespa, parse_feed_df, raise_for_status
from vespa.application import Vespa, parse_feed_df, df_to_vespafeed, raise_for_status
from vespa.exceptions import VespaError


def test_df_to_vespafeed():
df = pandas.DataFrame({
"id": [0, 1, 2],
"body": ["body 1", "body 2", "body 3"]
})
feed = json.loads(df_to_vespafeed(df, "myschema", "id"))

assert feed[1]["fields"]["body"] == "body 2"
assert feed[0]["id"] == "id:myschema:myschema::0"

feed = json.loads(df_to_vespafeed(df, "myschema", "id", "mynamespace"))
assert feed[2]["id"] == "id:mynamespace:myschema::2"


class TestVespa(unittest.TestCase):
def test_end_point(self):
self.assertEqual(
Expand Down

0 comments on commit 0001754

Please sign in to comment.