Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ terraform.tfstate.backup
**/derby.log
**/metastore_db/*
.env
.idea
.idea
.venv
**/.ipynb_checkpoints
5 changes: 5 additions & 0 deletions module_5_compute_engine/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Feast Compute Engine Demo
This is a demo of the Feast Compute Engine, which allows you to run feature transformations(filter, aggregation, custom udf etc.) by specified compute engine.

## Install and Run
Check notebook [compute_example.ipynb](compute_example.ipynb) for more details.
334 changes: 334 additions & 0 deletions module_5_compute_engine/compute_example.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,334 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"application/vnd.databricks.v1+cell": {
"inputWidgets": {},
"nuid": "0c4646a7-272d-44ec-95fe-3480a267e173",
"showTitle": false,
"title": ""
},
"collapsed": true,
"jupyter": {
"outputs_hidden": true
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"zsh:1: no matches found: feast[spark,aws,redis]\r\n",
"Note: you may need to restart the kernel to use updated packages.\n"
]
}
],
"source": [
"%pip install feast[spark,postgres,snowflake]"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"source": [
"## Apply feature definition"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"outputs": [],
"source": [
"import os\n",
"os.chdir(\"./feature_repo\")\n"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/Users/haoxu/dev/feature_store/feast/sdk/python/feast/batch_feature_view.py:93: RuntimeWarning: Batch feature views are experimental features in alpha development. Some functionality may still be unstable so functionality can change in the future.\n",
" warnings.warn(\n",
"WARNING: Using incubator modules: jdk.incubator.vector\n",
"Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties\n",
"Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
"25/06/10 20:39:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
"25/06/10 20:39:38 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"No project found in the repository. Using project name feast_demo_compute_engine defined in feature_store.yaml\n",
"Applying changes for project feast_demo_compute_engine\n",
"Deploying infrastructure for order_stats\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"INFO:py4j.clientserver:Closing down clientserver connection\n"
]
},
{
"data": {
"text/plain": [
"0"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"os.system(\"feast apply\")\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"source": [
"## Load Feature store API"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {
"application/vnd.databricks.v1+cell": {
"inputWidgets": {},
"nuid": "37baef9e-ffac-4cf9-ab6c-778e0321d544",
"showTitle": false,
"title": ""
}
},
"outputs": [],
"source": [
"from feast import FeatureStore\n",
"store = FeatureStore(repo_path=\".\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"source": [
"## Materialization"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": false,
"jupyter": {
"outputs_hidden": false
}
},
"source": [
"### Local"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {
"application/vnd.databricks.v1+cell": {
"inputWidgets": {},
"nuid": "83ad34fa-cbd3-4100-8c01-40720ddbc14e",
"showTitle": false,
"title": ""
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Materializing \u001b[1m\u001b[32m1\u001b[0m feature views from \u001b[1m\u001b[32m1992-04-20 00:00:00+00:00\u001b[0m to \u001b[1m\u001b[32m2025-04-21 00:00:00+00:00\u001b[0m into the \u001b[1m\u001b[32mpostgres\u001b[0m online store.\n",
"\n",
"\u001b[1m\u001b[32morder_stats\u001b[0m:\n",
"Elapsed time: 84.29606699943542 seconds\n"
]
}
],
"source": [
"from datetime import datetime\n",
"import time\n",
"\n",
"start_time = time.time()\n",
"store.materialize(\n",
" start_date=datetime(1992,4,20),\n",
" end_date=datetime(2025,4,21),\n",
")\n",
"end_time = time.time()\n",
"elapsed_time = end_time - start_time\n",
"print(f\"Elapsed time: {elapsed_time} seconds\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Retrieve feature data"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {
"scrolled": true
},
"outputs": [
{
"data": {
"text/plain": [
"{'O_CUSTKEY': [397082], 'O_TOTALPRICE': [304962.84375]}"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"store.get_online_features(\n",
" features=[\"order_stats:O_TOTALPRICE\"],\n",
" entity_rows=[{\"O_CUSTKEY\": 397082}]\n",
").to_dict()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Spark"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Materializing \u001b[1m\u001b[32m1\u001b[0m feature views from \u001b[1m\u001b[32m1992-04-20 00:00:00+00:00\u001b[0m to \u001b[1m\u001b[32m2025-04-21 00:00:00+00:00\u001b[0m into the \u001b[1m\u001b[32mpostgres\u001b[0m online store.\n",
"\n",
"\u001b[1m\u001b[32morder_stats\u001b[0m:\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 8:===================================================> (10 + 1) / 11]"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Elapsed time: 90.78104996681213 seconds\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" "
]
}
],
"source": [
"# Run in local mode\n",
"from datetime import datetime\n",
"import time\n",
"\n",
"start_time = time.time()\n",
"store.materialize(\n",
" start_date=datetime(1992,4,20),\n",
" end_date=datetime(2025,4,21),\n",
")\n",
"end_time = time.time()\n",
"elapsed_time = end_time - start_time\n",
"print(f\"Elapsed time: {elapsed_time} seconds\")"
]
}
],
"metadata": {
"application/vnd.databricks.v1+notebook": {
"dashboards": [],
"language": "python",
"notebookMetadata": {
"mostRecentlyExecutedCommandWithImplicitDF": {
"commandId": 402528431658022,
"dataframes": [
"_sqldf"
]
},
"pythonIndentUnit": 2
},
"notebookName": "Feast demo",
"notebookOrigID": 1254642919516165,
"widgets": {}
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.9"
},
"vscode": {
"interpreter": {
"hash": "7d634b9af180bcb32a446a43848522733ff8f5bbf0cc46dba1a83bede04bf237"
}
}
},
"nbformat": 4,
"nbformat_minor": 4
}
25 changes: 25 additions & 0 deletions module_5_compute_engine/feature_repo/data_sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from feast import (
Field,
FileSource,
PushSource,
RequestSource,
SnowflakeSource
)
from feast.types import Int64, Float32

# driver_stats = SparkSource(
# name="driver_stats_source",
# path="../data/driver_stats_lat_lon.parquet",
# timestamp_field="event_timestamp",
# created_timestamp_column="created",
# description="A table describing the stats of a driver based on hourly logs",
# owner="[email protected]",
# )


tpch_sf = SnowflakeSource(
database="SNOWFLAKE_SAMPLE_DATA",
schema="TPCH_SF10",
table="ORDERS",
timestamp_field="O_ORDERDATE"
)
11 changes: 11 additions & 0 deletions module_5_compute_engine/feature_repo/entities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from feast import (
Entity,
ValueType,
)

customer = Entity(
name="customer",
join_keys=["O_CUSTKEY"],
value_type=ValueType.INT64,
description="Custoemr id",
)
9 changes: 9 additions & 0 deletions module_5_compute_engine/feature_repo/feature_services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from feast import FeatureService

from features import *
#
# feature_service = FeatureService(
# name="model_v1",
# features=[bfv[["conv_rate"]]],
# owner="[email protected]",
# )
Loading
Loading