-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat: Offline Store historical features retrieval based on datetime range in dask #5717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
41b66fb
fa6059c
3ddcc3b
1ed85ce
039b009
497bce6
b28ea19
5a1b6c5
7cafbdc
973d7cd
e7700d7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,6 +1,6 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import os | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| import uuid | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from datetime import datetime, timezone | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from datetime import datetime, timedelta, timezone | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from pathlib import Path | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -133,21 +133,53 @@ def get_historical_features( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| config: RepoConfig, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| feature_views: List[FeatureView], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| feature_refs: List[str], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| entity_df: Union[pd.DataFrame, str], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| entity_df: Optional[Union[pd.DataFrame, dd.DataFrame, str]], | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| registry: BaseRegistry, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| project: str, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| full_feature_names: bool = False, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| **kwargs, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) -> RetrievalJob: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| assert isinstance(config.offline_store, DaskOfflineStoreConfig) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for fv in feature_views: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| assert isinstance(fv.batch_source, FileSource) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if not isinstance(entity_df, pd.DataFrame) and not isinstance( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| entity_df, dd.DataFrame | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| raise ValueError( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| f"Please provide an entity_df of type {type(pd.DataFrame)} instead of type {type(entity_df)}" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Allow non-entity mode using start/end timestamps to enable bounded retrievals without an input entity_df. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # This synthesizes a minimal entity_df solely to drive the existing join and metadata plumbing without | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # incurring source scans here; actual pushdowns can be layered in follow-ups if needed. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| start_date: Optional[datetime] = kwargs.get("start_date", None) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| end_date: Optional[datetime] = kwargs.get("end_date", None) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| non_entity_mode = entity_df is None | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if non_entity_mode: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Default end_date to current time (UTC) to keep behavior predictable without extra parameters. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| end_date = end_date or datetime.now(timezone.utc) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # When start_date is not provided, choose a conservative lower bound using max TTL, otherwise fall back. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if start_date is None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If start_date is given you have to make it tzaware ? |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| max_ttl_seconds = 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for fv in feature_views: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if fv.ttl and isinstance(fv.ttl, timedelta): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| max_ttl_seconds = max( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| max_ttl_seconds, int(fv.ttl.total_seconds()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if max_ttl_seconds > 0: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| start_date = end_date - timedelta(seconds=max_ttl_seconds) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| else: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Keep default window bounded to avoid unbounded scans by default. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| start_date = end_date - timedelta(days=30) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
159
to
173
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # When start_date is not provided, choose a conservative lower bound using max TTL, otherwise fall back. | |
| if start_date is None: | |
| max_ttl_seconds = 0 | |
| for fv in feature_views: | |
| if fv.ttl and isinstance(fv.ttl, timedelta): | |
| max_ttl_seconds = max( | |
| max_ttl_seconds, int(fv.ttl.total_seconds()) | |
| ) | |
| if max_ttl_seconds > 0: | |
| start_date = end_date - timedelta(seconds=max_ttl_seconds) | |
| else: | |
| # Keep default window bounded to avoid unbounded scans by default. | |
| start_date = end_date - timedelta(days=30) | |
| # Compute TTL-based lower bound for start_date. | |
| max_ttl_seconds = 0 | |
| for fv in feature_views: | |
| if fv.ttl and isinstance(fv.ttl, timedelta): | |
| max_ttl_seconds = max( | |
| max_ttl_seconds, int(fv.ttl.total_seconds()) | |
| ) | |
| if max_ttl_seconds > 0: | |
| ttl_lower_bound = end_date - timedelta(seconds=max_ttl_seconds) | |
| else: | |
| # Keep default window bounded to avoid unbounded scans by default. | |
| ttl_lower_bound = end_date - timedelta(days=30) | |
| # If user provided start_date, use the max of user start_date and ttl_lower_bound. | |
| if start_date is not None: | |
| if start_date < ttl_lower_bound: | |
| import warnings | |
| warnings.warn( | |
| f"Provided start_date ({start_date}) is earlier than TTL-based lower bound ({ttl_lower_bound}). Overriding start_date to {ttl_lower_bound}." | |
| ) | |
| start_date = max(start_date, ttl_lower_bound) | |
| else: | |
| start_date = ttl_lower_bound |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you havent given start date and tz. Both matters very much. It should be:
start=start_date, end=end_date, freq="1s", tz=timezone.utc
Outdated
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message formatting is incorrect. The f-string should format pd.DataFrame as a string, not call type() on it. This will result in an error message like "...type <class 'type'> instead of type <class 'type'>".
Consider changing to:
f"Please provide an entity_df of type pd.DataFrame or dask.dataframe.DataFrame instead of type {type(entity_df)}"| f"Please provide an entity_df of type {type(pd.DataFrame)} or dask.dataframe instead of type {type(entity_df)}" | |
| f"Please provide an entity_df of type pd.DataFrame or dask.dataframe.DataFrame instead of type {type(entity_df)}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aniketpalu Please fix this as well.
Outdated
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented-out code should be removed. If this line is no longer needed due to the refactoring below, it should be deleted rather than commented out.
| # df_to_join = _merge(entity_df_with_features, df_to_join, join_keys) |
Outdated
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The logic for checking missing join keys could be simplified. In non-entity mode, the synthetic entity_df will never contain join keys (it only has the event_timestamp column). Therefore, the any() check is unnecessary overhead.
Consider simplifying to:
current_join_keys = join_keys
if non_entity_mode:
current_join_keys = []This makes the logic clearer and more efficient since we know join keys are never present in the synthetic entity_df.
| if non_entity_mode and any( | |
| k not in entity_df_with_features.columns for k in join_keys | |
| ): | |
| if non_entity_mode: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with copilot here
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,70 @@ | ||||||
| from datetime import datetime, timezone | ||||||
| from unittest.mock import MagicMock | ||||||
|
|
||||||
| from feast.entity import Entity | ||||||
| from feast.feature_view import FeatureView, Field | ||||||
| from feast.infra.offline_stores.dask import ( | ||||||
| DaskOfflineStore, | ||||||
| DaskOfflineStoreConfig, | ||||||
| ) | ||||||
| from feast.infra.offline_stores.file_source import FileSource | ||||||
| from feast.repo_config import RepoConfig | ||||||
| from feast.types import Float32, ValueType | ||||||
|
|
||||||
|
|
||||||
| def _mock_dask_offline_store_config(): | ||||||
| return DaskOfflineStoreConfig(type="dask") | ||||||
|
|
||||||
|
|
||||||
| def _mock_entity(): | ||||||
| return [ | ||||||
| Entity( | ||||||
| name="driver_id", | ||||||
| join_keys=["driver_id"], | ||||||
| description="Driver ID", | ||||||
| value_type=ValueType.INT64, | ||||||
| ) | ||||||
| ] | ||||||
|
|
||||||
|
|
||||||
| def _mock_feature_view(): | ||||||
| return FeatureView( | ||||||
| name="driver_stats", | ||||||
| entities=_mock_entity(), | ||||||
| schema=[ | ||||||
| Field(name="conv_rate", dtype=Float32), | ||||||
| ], | ||||||
| source=FileSource( | ||||||
| path="dummy.parquet", # not read in this test | ||||||
| timestamp_field="event_timestamp", | ||||||
| ), | ||||||
| ) | ||||||
|
|
||||||
|
|
||||||
| def test_dask_non_entity_historical_retrieval_accepts_dates(): | ||||||
| repo_config = RepoConfig( | ||||||
| project="test_project", | ||||||
| registry="test_registry", | ||||||
| provider="local", | ||||||
| offline_store=_mock_dask_offline_store_config(), | ||||||
| ) | ||||||
|
|
||||||
| fv = _mock_feature_view() | ||||||
|
|
||||||
| # Expect this to work once non-entity mode is implemented for Dask-based store | ||||||
|
||||||
| # Expect this to work once non-entity mode is implemented for Dask-based store | |
| # Verify that non-entity mode (entity_df=None) accepts start_date and end_date parameters |
Outdated
Copilot
AI
Nov 12, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is outdated. Since the implementation is complete, this comment should be updated to describe what is being asserted.
Consider updating to something like:
# Should return a RetrievalJob instance| # When implemented, should return a RetrievalJob | |
| # Should return a RetrievalJob instance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think this is good enough to validate the data based retrieval
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.