|
1 | 1 | """REST API endpoints for registry lineage and relationships."""
|
2 | 2 |
|
| 3 | +import logging |
3 | 4 | from typing import Optional
|
4 | 5 |
|
5 | 6 | from fastapi import APIRouter, Depends, Query
|
6 | 7 |
|
7 | 8 | from feast.api.registry.rest.rest_utils import (
|
8 | 9 | create_grpc_pagination_params,
|
9 | 10 | create_grpc_sorting_params,
|
| 11 | + get_all_project_resources, |
10 | 12 | get_pagination_params,
|
11 | 13 | get_sorting_params,
|
12 | 14 | grpc_call,
|
13 | 15 | )
|
14 | 16 | from feast.protos.feast.registry import RegistryServer_pb2
|
15 | 17 |
|
| 18 | +logger = logging.getLogger(__name__) |
| 19 | + |
16 | 20 |
|
17 | 21 | def get_lineage_router(grpc_handler) -> APIRouter:
|
18 | 22 | router = APIRouter()
|
@@ -141,69 +145,44 @@ def get_complete_registry_data(
|
141 | 145 | )
|
142 | 146 | lineage_response = grpc_call(grpc_handler.GetRegistryLineage, lineage_req)
|
143 | 147 |
|
144 |
| - # Get all registry objects |
145 |
| - entities_req = RegistryServer_pb2.ListEntitiesRequest( |
146 |
| - project=project, |
147 |
| - allow_cache=allow_cache, |
148 |
| - pagination=grpc_pagination, |
149 |
| - sorting=grpc_sorting, |
150 |
| - ) |
151 |
| - entities_response = grpc_call(grpc_handler.ListEntities, entities_req) |
152 |
| - |
153 |
| - data_sources_req = RegistryServer_pb2.ListDataSourcesRequest( |
154 |
| - project=project, |
155 |
| - allow_cache=allow_cache, |
156 |
| - pagination=grpc_pagination, |
157 |
| - sorting=grpc_sorting, |
158 |
| - ) |
159 |
| - data_sources_response = grpc_call( |
160 |
| - grpc_handler.ListDataSources, data_sources_req |
161 |
| - ) |
162 |
| - |
163 |
| - feature_views_req = RegistryServer_pb2.ListAllFeatureViewsRequest( |
164 |
| - project=project, |
165 |
| - allow_cache=allow_cache, |
166 |
| - pagination=grpc_pagination, |
167 |
| - sorting=grpc_sorting, |
168 |
| - ) |
169 |
| - feature_views_response = grpc_call( |
170 |
| - grpc_handler.ListAllFeatureViews, feature_views_req |
171 |
| - ) |
172 |
| - |
173 |
| - feature_services_req = RegistryServer_pb2.ListFeatureServicesRequest( |
174 |
| - project=project, |
175 |
| - allow_cache=allow_cache, |
176 |
| - pagination=grpc_pagination, |
177 |
| - sorting=grpc_sorting, |
178 |
| - ) |
179 |
| - feature_services_response = grpc_call( |
180 |
| - grpc_handler.ListFeatureServices, feature_services_req |
181 |
| - ) |
182 |
| - |
183 |
| - features_req = RegistryServer_pb2.ListFeaturesRequest( |
184 |
| - project=project, |
185 |
| - pagination=grpc_pagination, |
186 |
| - sorting=grpc_sorting, |
| 148 | + # Get all registry objects using shared helper function |
| 149 | + project_resources, pagination, errors = get_all_project_resources( |
| 150 | + grpc_handler, |
| 151 | + project, |
| 152 | + allow_cache, |
| 153 | + tags={}, |
| 154 | + pagination_params=pagination_params, |
| 155 | + sorting_params=sorting_params, |
187 | 156 | )
|
188 |
| - features_response = grpc_call(grpc_handler.ListFeatures, features_req) |
189 |
| - |
| 157 | + if errors and not project_resources: |
| 158 | + logger.error( |
| 159 | + f"Error getting project resources for project {project}: {errors}" |
| 160 | + ) |
| 161 | + return { |
| 162 | + "project": project, |
| 163 | + "objects": {}, |
| 164 | + "relationships": [], |
| 165 | + "indirectRelationships": [], |
| 166 | + "pagination": {}, |
| 167 | + } |
190 | 168 | return {
|
191 | 169 | "project": project,
|
192 | 170 | "objects": {
|
193 |
| - "entities": entities_response.get("entities", []), |
194 |
| - "dataSources": data_sources_response.get("dataSources", []), |
195 |
| - "featureViews": feature_views_response.get("featureViews", []), |
196 |
| - "featureServices": feature_services_response.get("featureServices", []), |
197 |
| - "features": features_response.get("features", []), |
| 171 | + "entities": project_resources.get("entities", []), |
| 172 | + "dataSources": project_resources.get("dataSources", []), |
| 173 | + "featureViews": project_resources.get("featureViews", []), |
| 174 | + "featureServices": project_resources.get("featureServices", []), |
| 175 | + "features": project_resources.get("features", []), |
198 | 176 | },
|
199 | 177 | "relationships": lineage_response.get("relationships", []),
|
200 | 178 | "indirectRelationships": lineage_response.get("indirectRelationships", []),
|
201 | 179 | "pagination": {
|
202 |
| - "entities": entities_response.get("pagination", {}), |
203 |
| - "dataSources": data_sources_response.get("pagination", {}), |
204 |
| - "featureViews": feature_views_response.get("pagination", {}), |
205 |
| - "featureServices": feature_services_response.get("pagination", {}), |
206 |
| - "features": features_response.get("pagination", {}), |
| 180 | + # Get pagination metadata from project_resources if available, otherwise use empty dicts |
| 181 | + "entities": pagination.get("entities", {}), |
| 182 | + "dataSources": pagination.get("dataSources", {}), |
| 183 | + "featureViews": pagination.get("featureViews", {}), |
| 184 | + "featureServices": pagination.get("featureServices", {}), |
| 185 | + "features": pagination.get("features", {}), |
207 | 186 | "relationships": lineage_response.get("relationshipsPagination", {}),
|
208 | 187 | "indirectRelationships": lineage_response.get(
|
209 | 188 | "indirectRelationshipsPagination", {}
|
@@ -265,61 +244,38 @@ def get_complete_registry_data_all(
|
265 | 244 | allow_cache=allow_cache,
|
266 | 245 | )
|
267 | 246 | lineage_response = grpc_call(grpc_handler.GetRegistryLineage, lineage_req)
|
268 |
| - # Get all registry objects |
269 |
| - entities_req = RegistryServer_pb2.ListEntitiesRequest( |
270 |
| - project=project_name, |
271 |
| - allow_cache=allow_cache, |
272 |
| - ) |
273 |
| - entities_response = grpc_call(grpc_handler.ListEntities, entities_req) |
274 |
| - data_sources_req = RegistryServer_pb2.ListDataSourcesRequest( |
275 |
| - project=project_name, |
276 |
| - allow_cache=allow_cache, |
277 |
| - ) |
278 |
| - data_sources_response = grpc_call( |
279 |
| - grpc_handler.ListDataSources, data_sources_req |
280 |
| - ) |
281 |
| - feature_views_req = RegistryServer_pb2.ListAllFeatureViewsRequest( |
282 |
| - project=project_name, |
283 |
| - allow_cache=allow_cache, |
284 |
| - ) |
285 |
| - feature_views_response = grpc_call( |
286 |
| - grpc_handler.ListAllFeatureViews, feature_views_req |
287 |
| - ) |
288 |
| - feature_services_req = RegistryServer_pb2.ListFeatureServicesRequest( |
289 |
| - project=project_name, |
290 |
| - allow_cache=allow_cache, |
291 |
| - ) |
292 |
| - feature_services_response = grpc_call( |
293 |
| - grpc_handler.ListFeatureServices, feature_services_req |
294 |
| - ) |
295 | 247 |
|
296 |
| - features_req = RegistryServer_pb2.ListFeaturesRequest( |
297 |
| - project=project_name, |
| 248 | + # Get all registry objects using shared helper function |
| 249 | + project_resources, _, errors = get_all_project_resources( |
| 250 | + grpc_handler, project_name, allow_cache, tags={} |
298 | 251 | )
|
299 |
| - features_response = grpc_call(grpc_handler.ListFeatures, features_req) |
| 252 | + |
| 253 | + if errors and not project_resources: |
| 254 | + logger.error( |
| 255 | + f"Error getting project resources for project {project_name}: {errors}" |
| 256 | + ) |
| 257 | + continue |
300 | 258 |
|
301 | 259 | # Add project field to each object
|
302 |
| - for entity in entities_response.get("entities", []): |
| 260 | + for entity in project_resources.get("entities", []): |
303 | 261 | entity["project"] = project_name
|
304 |
| - for ds in data_sources_response.get("dataSources", []): |
| 262 | + for ds in project_resources.get("dataSources", []): |
305 | 263 | ds["project"] = project_name
|
306 |
| - for fv in feature_views_response.get("featureViews", []): |
| 264 | + for fv in project_resources.get("featureViews", []): |
307 | 265 | fv["project"] = project_name
|
308 |
| - for fs in feature_services_response.get("featureServices", []): |
| 266 | + for fs in project_resources.get("featureServices", []): |
309 | 267 | fs["project"] = project_name
|
310 |
| - for feat in features_response.get("features", []): |
| 268 | + for feat in project_resources.get("features", []): |
311 | 269 | feat["project"] = project_name
|
312 | 270 | all_data.append(
|
313 | 271 | {
|
314 | 272 | "project": project_name,
|
315 | 273 | "objects": {
|
316 |
| - "entities": entities_response.get("entities", []), |
317 |
| - "dataSources": data_sources_response.get("dataSources", []), |
318 |
| - "featureViews": feature_views_response.get("featureViews", []), |
319 |
| - "featureServices": feature_services_response.get( |
320 |
| - "featureServices", [] |
321 |
| - ), |
322 |
| - "features": features_response.get("features", []), |
| 274 | + "entities": project_resources.get("entities", []), |
| 275 | + "dataSources": project_resources.get("dataSources", []), |
| 276 | + "featureViews": project_resources.get("featureViews", []), |
| 277 | + "featureServices": project_resources.get("featureServices", []), |
| 278 | + "features": project_resources.get("features", []), |
323 | 279 | },
|
324 | 280 | "relationships": lineage_response.get("relationships", []),
|
325 | 281 | "indirectRelationships": lineage_response.get(
|
|
0 commit comments