Skip to content

[SPARK-51883][DOCS][PYTHON] Python Data Source user guide for filter pushdown #50684

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 213 additions & 5 deletions python/docs/source/tutorial/sql/python_data_source.rst
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ This is the same dummy streaming reader that generate 2 rows every batch impleme
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as input and read an iterator of data deterministically.
This is called whe query replay batches during restart or after failure.
This is called when replaying batches during restart or after failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
Expand Down Expand Up @@ -356,17 +356,32 @@ For library that are used inside a method, it must be imported inside the method
from pyspark import TaskContext
context = TaskContext.get()

Mutating State
~~~~~~~~~~~~~~
The following methods should not mutate internal state. Changes to the object state made in these methods are not guaranteed to be visible or invisible to future calls.

- DataSourceReader.partitions()
- DataSourceReader.read()
- DataSourceStreamReader.read()
- SimpleDataSourceStreamReader.readBetweenOffsets()
- All writer methods

All other methods such as DataSource.schema() and DataSourceStreamReader.latestOffset() can be stateful. Changes to the object state made in these methods are visible to future calls.

Using a Python Data Source
--------------------------
**Use a Python Data Source in Batch Query**

Use a Python Data Source in Batch Query
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

After defining your data source, it must be registered before usage.

.. code-block:: python

spark.dataSource.register(FakeDataSource)

**Read From a Python Data Source**
Read From a Python Data Source
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Read from the fake datasource with the default schema and options:

Expand Down Expand Up @@ -412,7 +427,8 @@ Read from the fake datasource with a different number of rows:
# | Douglas James|2007-01-18| 46226| Alabama|
# +--------------+----------+-------+------------+

**Write To a Python Data Source**
Write To a Python Data Source
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

To write data to a custom location, make sure that you specify the `mode()` clause. Supported modes are `append` and `overwrite`.

Expand All @@ -424,7 +440,8 @@ To write data to a custom location, make sure that you specify the `mode()` clau
# You can check the Spark log (standard error) to see the output of the write operation.
# Total number of rows: 10

**Use a Python Data Source in Streaming Query**
Use a Python Data Source in Streaming Query
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Once we register the python data source, we can also use it in streaming queries as source of readStream() or sink of writeStream() by passing short name or full name to format().

Expand Down Expand Up @@ -517,6 +534,197 @@ The following example demonstrates how to implement a basic Data Source using Ar

df.show()

Filter Pushdown in Python Data Sources
--------------------------------------

Filter pushdown is an optimization technique that allows data sources to handle filters natively, reducing the amount of data that needs to be transferred and processed by Spark.

The filter pushdown API enables DataSourceReader to selectively push down filters from the query to the source.

You must turn on the configuration ``spark.sql.python.filterPushdown.enabled`` to enable filter pushdown.

**How Filter Pushdown Works**

When a query includes filter conditions, Spark can pass these filters to the data source implementation, which can then apply the filters during data retrieval. This is especially beneficial for:

- Data sources backed by formats that allow efficient filtering (e.g. key-value stores)
- APIs that support filtering (e.g. REST and GraphQL APIs)

The data source receives the filters, decides which ones can be pushed down, and returns the remaining filters to Spark to be applied later.

**Implementing Filter Pushdown**

To enable filter pushdown in your Python Data Source, implement the ``pushFilters`` method in your ``DataSourceReader`` class:

.. code-block:: python

import math
from typing import Iterable, List
from pyspark.sql.datasource import (
DataSource,
DataSourceReader,
EqualTo,
Filter,
GreaterThan,
LessThan,
GreaterThanOrEqual,
LessThanOrEqual,
)


class PrimesDataSource(DataSource):
"""
A data source that enumerates prime numbers.
"""

@classmethod
def name(cls):
return "primes"

def schema(self):
return "p int"

def reader(self, schema: str):
return PrimesDataSourceReader(schema, self.options)


class PrimesDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: str = schema
self.options = options
self.min = 2
self.max = math.inf

def pushFilters(self, filters: List[Filter]) -> Iterable[Filter]:
"""
Parameters
----------
filters : list of Filter objects
The AND of the filters that Spark would like to push down

Returns
-------
iterable of Filter objects
Filters that could not be pushed down and still need to be
evaluated by Spark
"""
for f in filters:
print(f"Got filter: {f}")
# Handle constraints on the range of numbers
if isinstance(f, EqualTo):
self.min = max(self.min, f.value)
self.max = min(self.max, f.value)
elif isinstance(f, GreaterThan):
self.min = max(self.min, f.value + 1)
elif isinstance(f, LessThan):
self.max = min(self.max, f.value - 1)
elif isinstance(f, GreaterThanOrEqual):
self.min = max(self.min, f.value)
elif isinstance(f, LessThanOrEqual):
self.max = min(self.max, f.value)
else:
yield f # Let Spark handle unsupported filters

def read(self, partition):
# Use the pushed filters to filter data during read
num = self.min
while num <= self.max:
if self._is_prime(num):
yield [num]
num += 1

@staticmethod
def _is_prime(n: int) -> bool:
"""Check if a number is prime."""
if n < 2:
return False
for i in range(2, int(n**0.5) + 1):
if n % i == 0:
return False
return True

# Register the data source
spark.dataSource.register(PrimesDataSource)
spark.read.format("primes").load().filter("2000 <= p and p < 2050").show()

# Got filter: IsNotNull(attribute=('p',))
# Got filter: GreaterThanOrEqual(attribute=('p',), value=2000)
# Got filter: LessThan(attribute=('p',), value=2050)
# +----+
# | p|
# +----+
# |2003|
# |2011|
# |2017|
# |2027|
# |2029|
# |2039|
# +----+

**Notes**

pushFilters() is called only if there are filters available to push down.
If it is called, the call happens before partitions().

**Supported Filter Types**

Spark supports pushing down the following filter types:

.. list-table::
:header-rows: 1

* - Filter Type
- Class
- SQL Equivalent
* - Equality
- ``EqualTo``
- ``column = constant``
* - Greater Than
- ``GreaterThan``
- ``column > constant``
* - Greater Than or Equal
- ``GreaterThanOrEqual``
- ``column >= constant``
* - Less Than
- ``LessThan``
- ``column < constant``
* - Less Than or Equal
- ``LessThanOrEqual``
- ``column <= constant``
* - IN list
- ``In``
- ``column IN (constants)``
* - IS NULL
- ``IsNull``
- ``column IS NULL``
* - IS NOT NULL
- ``IsNotNull``
- ``column IS NOT NULL``
* - String Contains
- ``StringContains``
- ``column LIKE '%constant%'``
* - String Starts With
- ``StringStartsWith``
- ``column LIKE 'constant%'``
* - String Ends With
- ``StringEndsWith``
- ``column LIKE '%constant'``
* - NOT
- ``Not``
- ``NOT filter``

Only supported filters are passed to the ``pushFilters`` method.

**Best Practices**

1. **Handle what you can, pass back what you can't**: Implement pushdown for filter types that your data source can handle efficiently. Return the remaining filters for Spark to process.

2. **Anticipate new filter types**: More filter types may be added in the future, so do not assume that you handled all possible filters in your implementation.

3. **Use pushed filters throughout the execution**: Store pushed filters and respect them in both ``partitions()`` and ``read()``.

4. **Test performance**: Compare query performance with and without filter pushdown to ensure it's providing the expected benefits.

Usage Notes
-----------

Expand Down