diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index 042eee06ab..a38bf17a11 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -19,6 +19,17 @@ from feast.permissions.permission import Permission from feast.project import Project from feast.project_metadata import ProjectMetadata +from feast.protos.feast.core.DataSource_pb2 import DataSourceList as DataSourceProtoList +from feast.protos.feast.core.Entity_pb2 import EntityList as EntityProtoList +from feast.protos.feast.core.FeatureService_pb2 import ( + FeatureServiceList as FeatureServiceProtoList, +) +from feast.protos.feast.core.FeatureView_pb2 import ( + FeatureViewList as FeatureViewProtoList, +) +from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( + OnDemandFeatureViewList as OnDemandFeatureViewProtoList, +) from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView @@ -467,3 +478,100 @@ def _start_thread_async_refresh(self, cache_ttl_seconds): def _exit_handler(self): self.registry_refresh_thread.cancel() + + # Methods to improve the registry calls + + @abstractmethod + def _list_feature_views_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> FeatureViewProtoList: + pass + + def list_feature_views_proto( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> FeatureViewProtoList: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_feature_views_proto( + self.cached_registry_proto, project, tags + ) + return self._list_feature_views_proto(project, tags) + + @abstractmethod + def _list_entities_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> EntityProtoList: + pass + + def list_entities_proto( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> EntityProtoList: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_entities_proto( + self.cached_registry_proto, project, tags + ) + return self._list_entities_proto(project, tags) + + @abstractmethod + def _list_data_sources_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> DataSourceProtoList: + pass + + def list_data_sources_proto( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> DataSourceProtoList: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_data_sources_proto( + self.cached_registry_proto, project, tags + ) + return self._list_data_sources_proto(project, tags) + + @abstractmethod + def _list_on_demand_feature_views_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> OnDemandFeatureViewProtoList: + pass + + def list_on_demand_feature_views_proto( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> OnDemandFeatureViewProtoList: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_on_demand_feature_views_proto( + self.cached_registry_proto, project, tags + ) + return self._list_on_demand_feature_views_proto(project, tags) + + @abstractmethod + def _list_feature_services_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> FeatureServiceProtoList: + pass + + def list_feature_services_proto( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> FeatureServiceProtoList: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_feature_services_proto( + self.cached_registry_proto, project, tags + ) + return self._list_feature_services_proto(project, tags) diff --git a/sdk/python/feast/infra/registry/proto_registry_utils.py b/sdk/python/feast/infra/registry/proto_registry_utils.py index fc5c3f6671..d7de12a681 100644 --- a/sdk/python/feast/infra/registry/proto_registry_utils.py +++ b/sdk/python/feast/infra/registry/proto_registry_utils.py @@ -21,6 +21,17 @@ from feast.permissions.permission import Permission from feast.project import Project from feast.project_metadata import ProjectMetadata +from feast.protos.feast.core.DataSource_pb2 import DataSourceList as DataSourceProtoList +from feast.protos.feast.core.Entity_pb2 import EntityList as EntityProtoList +from feast.protos.feast.core.FeatureService_pb2 import ( + FeatureServiceList as FeatureServiceProtoList, +) +from feast.protos.feast.core.FeatureView_pb2 import ( + FeatureViewList as FeatureViewProtoList, +) +from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( + OnDemandFeatureViewList as OnDemandFeatureViewProtoList, +) from feast.protos.feast.core.Registry_pb2 import ProjectMetadata as ProjectMetadataProto from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.saved_dataset import SavedDataset, ValidationReference @@ -367,3 +378,68 @@ def get_project(registry_proto: RegistryProto, name: str) -> Project: if projects_proto.spec.name == name: return Project.from_proto(projects_proto) raise ProjectObjectNotFoundException(name=name) + + +@registry_proto_cache_with_tags +def list_feature_views_proto( + registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] +) -> FeatureViewProtoList: + feature_views: FeatureViewProtoList = FeatureViewProtoList() + for feature_view_proto in registry_proto.feature_views: + if feature_view_proto.spec.project == project and utils.has_all_tags( + feature_view_proto.spec.tags, tags + ): + feature_views.featureviews.append(feature_view_proto) + return feature_views + + +@registry_proto_cache_with_tags +def list_feature_services_proto( + registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] +) -> FeatureServiceProtoList: + feature_services = FeatureServiceProtoList() + for feature_service_proto in registry_proto.feature_services: + if feature_service_proto.spec.project == project and utils.has_all_tags( + feature_service_proto.spec.tags, tags + ): + feature_services.featureservices.append(feature_service_proto) + return feature_services + + +@registry_proto_cache_with_tags +def list_on_demand_feature_views_proto( + registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] +) -> OnDemandFeatureViewProtoList: + on_demand_feature_views = OnDemandFeatureViewProtoList() + for on_demand_feature_view in registry_proto.on_demand_feature_views: + if on_demand_feature_view.spec.project == project and utils.has_all_tags( + on_demand_feature_view.spec.tags, tags + ): + on_demand_feature_views.ondemandfeatureviews.append(on_demand_feature_view) + return on_demand_feature_views + + +@registry_proto_cache_with_tags +def list_entities_proto( + registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] +) -> EntityProtoList: + entities = EntityProtoList() + for entity_proto in registry_proto.entities: + if entity_proto.spec.project == project and utils.has_all_tags( + entity_proto.spec.tags, tags + ): + entities.entities.append(entity_proto) + return entities + + +@registry_proto_cache_with_tags +def list_data_sources_proto( + registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] +) -> DataSourceProtoList: + data_sources = DataSourceProtoList() + for data_source_proto in registry_proto.data_sources: + if data_source_proto.project == project and utils.has_all_tags( + data_source_proto.tags, tags + ): + data_sources.datasources.append(data_source_proto) + return data_sources diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index a1ea1707a6..6ca542e79a 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -4,7 +4,7 @@ from datetime import datetime, timezone from enum import Enum from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Union, cast +from typing import Any, Callable, Dict, List, Optional, Type, Union, cast from pydantic import StrictInt, StrictStr from sqlalchemy import ( # type: ignore @@ -50,15 +50,26 @@ from feast.project import Project from feast.project_metadata import ProjectMetadata from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.DataSource_pb2 import DataSourceList as DataSourceProtoList from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto +from feast.protos.feast.core.Entity_pb2 import EntityList as EntityProtoList from feast.protos.feast.core.FeatureService_pb2 import ( FeatureService as FeatureServiceProto, ) +from feast.protos.feast.core.FeatureService_pb2 import ( + FeatureServiceList as FeatureServiceProtoList, +) from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto +from feast.protos.feast.core.FeatureView_pb2 import ( + FeatureViewList as FeatureViewProtoList, +) from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( OnDemandFeatureView as OnDemandFeatureViewProto, ) +from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( + OnDemandFeatureViewList as OnDemandFeatureViewProtoList, +) from feast.protos.feast.core.Permission_pb2 import Permission as PermissionProto from feast.protos.feast.core.Project_pb2 import Project as ProjectProto from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto @@ -1326,3 +1337,116 @@ def get_project_metadata( datetime.utcfromtimestamp(int(metadata_value)) ) return project_metadata_model + + def get_objects_list( + self, proto_class: Type + ) -> Union[ + FeatureViewProtoList, + OnDemandFeatureViewProtoList, + EntityProtoList, + DataSourceProtoList, + FeatureServiceProtoList, + ]: + # Define the mapping from proto_class to list type + proto_class_to_list = { + FeatureViewProto: FeatureViewProtoList, + OnDemandFeatureViewProto: OnDemandFeatureViewProtoList, + EntityProto: EntityProtoList, + DataSourceProto: DataSourceProtoList, + FeatureServiceProto: FeatureServiceProtoList, + } + proto_list = proto_class_to_list.get(proto_class, None) + if proto_list is None: + raise ValueError(f"Unsupported proto class: {proto_class}") + return proto_list() + + def _list_objects_proto( + self, + table: Table, + project: str, + proto_class: Any, + proto_field_name: str, + tags: Optional[dict[str, str]] = None, + ): + with self.read_engine.begin() as conn: + stmt = select(table).where(table.c.project_id == project) + rows = conn.execute(stmt).all() + if rows: + objects = self.get_objects_list(proto_class) + for row in rows: + obj = proto_class.FromString(row._mapping[proto_field_name]) + if utils.has_all_tags( + dict( + obj.tags + if isinstance(objects, DataSourceProtoList) + else obj.spec.tags + ), + tags, + ): + if isinstance(objects, DataSourceProtoList): + objects.datasources.append(obj) + elif isinstance(objects, FeatureViewProtoList): + objects.featureviews.append(obj) + elif isinstance(objects, OnDemandFeatureViewProtoList): + objects.ondemandfeatureviews.append(obj) + elif isinstance(objects, EntityProtoList): + objects.entities.append(obj) + elif isinstance(objects, FeatureServiceProtoList): + objects.featureservices.append(obj) + return objects + return [] + + def _list_feature_services_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> FeatureServiceProtoList: + return self._list_objects_proto( + feature_services, + project, + FeatureServiceProto, + "feature_service_proto", + tags=tags, + ) + + def _list_feature_views_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> FeatureViewProtoList: + return self._list_objects_proto( + feature_views, + project, + FeatureViewProto, + "feature_view_proto", + tags=tags, + ) + + def _list_on_demand_feature_views_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> OnDemandFeatureViewProtoList: + return self._list_objects_proto( + on_demand_feature_views, + project, + OnDemandFeatureViewProto, + "feature_view_proto", + tags=tags, + ) + + def _list_entities_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> EntityProtoList: + return self._list_objects_proto( + entities, + project, + EntityProto, + "entity_proto", + tags=tags, + ) + + def _list_data_sources_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> DataSourceProtoList: + return self._list_objects_proto( + data_sources, + project, + DataSourceProto, + "data_source_proto", + tags=tags, + )