Skip to content

Commit

Permalink
Skip convergence check when bootstrapping Marqo (#1036)
Browse files Browse the repository at this point in the history
  • Loading branch information
papa99do authored Nov 19, 2024
1 parent 65029cf commit 14d067d
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 49 deletions.
43 changes: 31 additions & 12 deletions src/marqo/core/index_management/index_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,26 @@ def bootstrap_vespa(self) -> bool:
Returns:
True if Vespa was bootstrapped, False if it was already up-to-date
"""
with self._vespa_deployment_lock():
vespa_app = self._get_vespa_application(check_configured=False, need_binary_file_support=True)

to_version = version.get_version()
from_version = vespa_app.get_marqo_config().version if vespa_app.is_configured else None
# We skip the Vespa convergence check here so that Marqo instance can be bootstrapped even when Vespa is
# not converged.
to_version = version.get_version()
vespa_app_for_version_check = self._get_vespa_application(check_configured=False, need_binary_file_support=True,
check_for_application_convergence=False)
from_version = vespa_app_for_version_check.get_marqo_config().version \
if vespa_app_for_version_check.is_configured else None

if from_version and semver.VersionInfo.parse(from_version) >= semver.VersionInfo.parse(to_version):
# skip bootstrapping if already bootstrapped to this version or later
return False

if from_version and semver.VersionInfo.parse(from_version) >= semver.VersionInfo.parse(to_version):
# skip bootstrapping if already bootstrapped to this version or later
return False
with self._vespa_deployment_lock():
# Initialise another session based on the latest active Vespa session. The reason we do this again while
# holding the distributed lock is that the Vespa application might be changed by other operations when
# we wait for the lock. This time, we error out if the Vespa application is not converged, which reduces
# the chance of running into race conditions.
vespa_app = self._get_vespa_application(check_configured=False, need_binary_file_support=True,
check_for_application_convergence=True)

# Only retrieving existing index when the vespa app is not configured and the index settings schema exists
existing_indexes = self._get_existing_indexes() if not vespa_app.is_configured and \
Expand All @@ -105,8 +116,12 @@ def bootstrap_vespa(self) -> bool:
return True

def rollback_vespa(self) -> None:
"""
Roll back Vespa application package to the previous version backed up in the current app package.
"""
with self._vespa_deployment_lock():
self._get_vespa_application(need_binary_file_support=True).rollback(version.get_version())
vespa_app = self._get_vespa_application(need_binary_file_support=True)
vespa_app.rollback(version.get_version())

def create_index(self, marqo_index_request: MarqoIndexRequest) -> MarqoIndex:
"""
Expand Down Expand Up @@ -274,15 +289,17 @@ def get_marqo_version(self) -> str:
"""
return self._get_vespa_application().get_marqo_config().version

def _get_vespa_application(self, check_configured: bool = True, need_binary_file_support: bool = False) \
-> VespaApplicationPackage:
def _get_vespa_application(self, check_configured: bool = True, need_binary_file_support: bool = False,
check_for_application_convergence: bool = True) -> VespaApplicationPackage:
"""
Retrieve a Vespa application package. Depending on whether we need to handle binary files and the Vespa version,
it uses different implementation of VespaApplicationStore.
Args:
check_configured: if set to True, it checks whether the application package is configured or not.
need_binary_file_support: indicates whether the support for binary file is needed.
check_for_application_convergence: whether we check convergence of the Vespa app package. If set to true and
Vespa is not converged, this process will fail with a VespaError raised.
Returns:
The VespaApplicationPackage instance we can use to do bootstrapping/rollback and any index operations.
Expand Down Expand Up @@ -314,13 +331,15 @@ def _get_vespa_application(self, check_configured: bool = True, need_binary_file
application_package_store = VespaApplicationFileStore(
vespa_client=self.vespa_client,
deploy_timeout=self._deployment_timeout_seconds,
wait_for_convergence_timeout=self._convergence_timeout_seconds
wait_for_convergence_timeout=self._convergence_timeout_seconds,
check_for_application_convergence=check_for_application_convergence
)
else:
application_package_store = ApplicationPackageDeploymentSessionStore(
vespa_client=self.vespa_client,
deploy_timeout=self._deployment_timeout_seconds,
wait_for_convergence_timeout=self._convergence_timeout_seconds
wait_for_convergence_timeout=self._convergence_timeout_seconds,
check_for_application_convergence=check_for_application_convergence
)

application = VespaApplicationPackage(application_package_store)
Expand Down
12 changes: 8 additions & 4 deletions src/marqo/core/index_management/vespa_application_package.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,11 @@ class VespaApplicationFileStore(VespaApplicationStore):
more details. This is the only viable option to deploy changes of binary files before Vespa version 8.382.22.
We implement this approach to support bootstrapping and rollback for Vespa version prior to 8.382.22.
"""
def __init__(self, vespa_client: VespaClient, deploy_timeout: int, wait_for_convergence_timeout: int):
def __init__(self, vespa_client: VespaClient, deploy_timeout: int, wait_for_convergence_timeout: int,
check_for_application_convergence: bool = True):
super().__init__(vespa_client, deploy_timeout, wait_for_convergence_timeout)
self._app_root_path = vespa_client.download_application(check_for_application_convergence=True)
self._app_root_path = vespa_client.download_application(
check_for_application_convergence=check_for_application_convergence)

def _full_path(self, *paths: str) -> str:
return os.path.join(self._app_root_path, *paths)
Expand Down Expand Up @@ -483,9 +485,11 @@ class ApplicationPackageDeploymentSessionStore(VespaApplicationStore):
See https://docs.vespa.ai/en/reference/deploy-rest-api-v2.html#create-session for more details.
However, this approach does not support binary files for Vespa version prior to 8.382.22.
"""
def __init__(self, vespa_client: VespaClient, deploy_timeout: int, wait_for_convergence_timeout: int):
def __init__(self, vespa_client: VespaClient, deploy_timeout: int, wait_for_convergence_timeout: int,
check_for_application_convergence: bool = True):
super().__init__(vespa_client, deploy_timeout, wait_for_convergence_timeout)
self._content_base_url, self._prepare_url = vespa_client.create_deployment_session()
self._content_base_url, self._prepare_url = vespa_client.create_deployment_session(
check_for_application_convergence)
self._all_contents = vespa_client.list_contents(self._content_base_url)

def file_exists(self, *paths: str) -> bool:
Expand Down
12 changes: 9 additions & 3 deletions src/marqo/vespa/vespa_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,12 @@ def deploy_application(self, application: str, timeout: int = 60) -> None:

self._raise_for_status(response)

def create_deployment_session(self) -> Tuple[str, str]:
def create_deployment_session(self, check_for_application_convergence: bool = True) -> Tuple[str, str]:
"""
Create a Vespa deployment session.
Args:
check_for_application_convergence: check for the application to converge before create a deployment session.
Returns:
Tuple[str, str]:
- content_base_url is the base url for contents in this session
Expand All @@ -107,7 +110,9 @@ def create_deployment_session(self) -> Tuple[str, str]:
via Zookeeper. Following requests should use content_base_url and prepare_url to make sure it can hit the right
config server that this session is created on.
"""
self.check_for_application_convergence()
if check_for_application_convergence:
self.check_for_application_convergence()

res = self._create_deploy_session(self.http_client)
content_base_url = res['content']
prepare_url = res['prepared']
Expand Down Expand Up @@ -193,7 +198,8 @@ def wait_for_application_convergence(self, timeout: int = 120) -> None:
except (httpx.TimeoutException, httpcore.TimeoutException):
logger.error("Marqo timed out waiting for Vespa application to converge. Will retry.")

raise VespaError(f"Vespa application did not converge within {timeout} seconds")
raise VespaError(f"Vespa application did not converge within {timeout} seconds. "
f"The convergence status is {self._get_convergence_status()}")

def query(self, yql: str, hits: int = 10, ranking: str = None, model_restrict: str = None,
query_features: Dict[str, Any] = None, timeout: float = None, **kwargs) -> QueryResult:
Expand Down
90 changes: 60 additions & 30 deletions tests/core/index_management/test_index_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def setUp(self):
zookeeper_client=self.zookeeper_client,
enable_index_operations=True,
deployment_timeout_seconds=30,
convergence_timeout_seconds=60)
convergence_timeout_seconds=120)
# this resets the application package to a clean state
self._test_dir = str(os.path.dirname(os.path.abspath(__file__)))
self._deploy_initial_app_package()
Expand Down Expand Up @@ -243,6 +243,36 @@ def test_rollback_should_succeed(self):
os.path.join(latest_version, *file)
)

@patch('marqo.vespa.vespa_client.VespaClient.check_for_application_convergence')
def test_bootstrap_and_rollback_should_skip_convergence_check(self, mock_check_convergence):
self.index_management.bootstrap_vespa()
mock_check_convergence.assert_not_called()

mock_check_convergence.reset_mock()

try:
self.index_management.rollback_vespa()
except ApplicationRollbackError:
pass
mock_check_convergence.assert_not_called()

@patch('marqo.vespa.vespa_client.VespaClient.check_for_application_convergence')
@patch('marqo.vespa.vespa_client.VespaClient.get_vespa_version')
def test_bootstrap_and_rollback_should_not_skip_convergence_check_for_older_vespa_version(self, mock_vespa_version,
mock_check_convergence):
mock_vespa_version.return_value = '8.382.21'

self.index_management.bootstrap_vespa()
mock_check_convergence.assert_called_once()

mock_check_convergence.reset_mock()

try:
self.index_management.rollback_vespa()
except ApplicationRollbackError:
pass
mock_check_convergence.assert_called_once()

def test_rollback_should_fail_when_target_version_is_current_version(self):
self.index_management.bootstrap_vespa()
with self.assertRaisesStrict(ApplicationRollbackError) as e:
Expand Down Expand Up @@ -304,45 +334,45 @@ def test_rollback_should_fail_when_admin_config_is_changed(self):
self.assertEqual("Aborting rollback. Reason: Vector store config has been changed since the last backup.",
str(e.exception))

def test_index_operation_methods_should_raise_error_if_index_operation_is_disabled(self):
# Create an index management instance with index operation disabled (by default)
self.index_management = IndexManagement(self.vespa_client, zookeeper_client=None)
def _index_operations(self, index_management: IndexManagement):
index_request_1 = self.structured_marqo_index_request(
fields=[FieldRequest(name='title', type=FieldType.Text)],
tensor_fields=['title']
)
index_request_2 = self.unstructured_marqo_index_request()

with self.assertRaisesStrict(InternalError):
self.index_management.create_index(index_request_1)

with self.assertRaisesStrict(InternalError):
self.index_management.batch_create_indexes([index_request_1, index_request_2])
return [
('create single index', lambda: index_management.create_index(index_request_1)),
('batch create indexes', lambda: index_management.batch_create_indexes([index_request_1, index_request_2])),
('delete single index', lambda: index_management.delete_index_by_name(index_request_1.name)),
('batch delete indexes', lambda: index_management.batch_delete_indexes_by_name([index_request_1.name, index_request_2.name])),
]

with self.assertRaisesStrict(InternalError):
self.index_management.delete_index_by_name(index_request_1.name)
def test_index_operation_methods_should_raise_error_if_index_operation_is_disabled(self):
index_management_without_zookeeper = IndexManagement(self.vespa_client, zookeeper_client=None)

with self.assertRaisesStrict(InternalError):
self.index_management.batch_delete_indexes_by_name([index_request_1.name, index_request_2.name])
for test_case, index_operation in self._index_operations(index_management_without_zookeeper):
with self.subTest(test_case):
with self.assertRaisesStrict(InternalError):
index_operation()

def test_index_operation_methods_should_raise_error_if_marqo_is_not_bootstrapped(self):
index_request_1 = self.structured_marqo_index_request(
fields=[FieldRequest(name='title', type=FieldType.Text)],
tensor_fields=['title']
)
index_request_2 = self.unstructured_marqo_index_request()

with self.assertRaisesStrict(ApplicationNotInitializedError):
self.index_management.create_index(index_request_1)

with self.assertRaisesStrict(ApplicationNotInitializedError):
self.index_management.batch_create_indexes([index_request_1, index_request_2])

with self.assertRaisesStrict(ApplicationNotInitializedError):
self.index_management.delete_index_by_name(index_request_1.name)

with self.assertRaisesStrict(ApplicationNotInitializedError):
self.index_management.batch_delete_indexes_by_name([index_request_1.name, index_request_2.name])
for test_case, index_operation in self._index_operations(self.index_management):
with self.subTest(test_case):
with self.assertRaisesStrict(ApplicationNotInitializedError):
index_operation()

@patch('marqo.vespa.vespa_client.VespaClient.check_for_application_convergence')
def test_index_operation_methods_should_check_convergence(self, mock_check_convergence):
for test_case, index_operation in self._index_operations(self.index_management):
with self.subTest(test_case):
try:
index_operation()
except ApplicationNotInitializedError:
pass

mock_check_convergence.assert_called_once()
mock_check_convergence.reset_mock()

def test_create_and_delete_index_should_succeed(self):
# merge batch create and delete happy path to save some testing time
Expand Down

0 comments on commit 14d067d

Please sign in to comment.