From 623a54eceabcfeb0e841c8789f91ea4128ea73f9 Mon Sep 17 00:00:00 2001 From: Haoyu Weng Date: Wed, 23 Apr 2025 09:38:10 -0700 Subject: [PATCH 1/7] add mutating state section --- .../tutorial/sql/python_data_source.rst | 23 +++++++++++++++---- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/python/docs/source/tutorial/sql/python_data_source.rst b/python/docs/source/tutorial/sql/python_data_source.rst index 22b2a0b5f3c7b..7aba62daa53cb 100644 --- a/python/docs/source/tutorial/sql/python_data_source.rst +++ b/python/docs/source/tutorial/sql/python_data_source.rst @@ -287,7 +287,7 @@ This is the same dummy streaming reader that generate 2 rows every batch impleme def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]: """ Takes start and end offset as input and read an iterator of data deterministically. - This is called whe query replay batches during restart or after failure. + This is called when replaying batches during restart or after failure. """ start_idx = start["offset"] end_idx = end["offset"] @@ -356,9 +356,19 @@ For library that are used inside a method, it must be imported inside the method from pyspark import TaskContext context = TaskContext.get() +Mutating State +~~~~~~~~~~~~~~ +Some methods such as DataSourceReader.read() and DataSourceReader.partitions() must be stateless. Changes to the object state made in these methods are not guaranteed to be visible or invisible to future invocations. + +Other methods such as DataSource.schema() and DataSourceStreamReader.latestOffset() can be stateful. Changes to the object state made in these methods are visible to future invocations. + +Refer to the documentation of each method for more details. + Using a Python Data Source -------------------------- -**Use a Python Data Source in Batch Query** + +Use a Python Data Source in Batch Query +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ After defining your data source, it must be registered before usage. @@ -366,7 +376,8 @@ After defining your data source, it must be registered before usage. spark.dataSource.register(FakeDataSource) -**Read From a Python Data Source** +Read From a Python Data Source +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Read from the fake datasource with the default schema and options: @@ -412,7 +423,8 @@ Read from the fake datasource with a different number of rows: # | Douglas James|2007-01-18| 46226| Alabama| # +--------------+----------+-------+------------+ -**Write To a Python Data Source** +Write To a Python Data Source +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ To write data to a custom location, make sure that you specify the `mode()` clause. Supported modes are `append` and `overwrite`. @@ -424,7 +436,8 @@ To write data to a custom location, make sure that you specify the `mode()` clau # You can check the Spark log (standard error) to see the output of the write operation. # Total number of rows: 10 -**Use a Python Data Source in Streaming Query** +Use a Python Data Source in Streaming Query +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Once we register the python data source, we can also use it in streaming queries as source of readStream() or sink of writeStream() by passing short name or full name to format(). From 3f090c2ecb331310843c9174fa1a39ed70ad3fc5 Mon Sep 17 00:00:00 2001 From: Haoyu Weng Date: Wed, 23 Apr 2025 10:13:51 -0700 Subject: [PATCH 2/7] add filter pushdown docs --- .../tutorial/sql/python_data_source.rst | 115 ++++++++++++++++++ python/pyspark/sql/datasource.py | 5 + 2 files changed, 120 insertions(+) diff --git a/python/docs/source/tutorial/sql/python_data_source.rst b/python/docs/source/tutorial/sql/python_data_source.rst index 7aba62daa53cb..7d070e626529b 100644 --- a/python/docs/source/tutorial/sql/python_data_source.rst +++ b/python/docs/source/tutorial/sql/python_data_source.rst @@ -530,6 +530,121 @@ The following example demonstrates how to implement a basic Data Source using Ar df.show() +Filter Pushdown in Python Data Sources +-------------------------------------- + +Filter pushdown is an optimization technique that allows data sources to handle filters natively, reducing the amount of data that needs to be transferred and processed by Spark. + +The filter pushdown API is introduced in Spark 4.1, enabling DataSourceReader to selectively push down filters from the query to the source. + +You must turn on the configuration ``spark.sql.python.filterPushdown.enabled`` to enable filter pushdown. + +**How Filter Pushdown Works** + +When a query includes filter conditions, Spark can pass these filters to the data source implementation, which can then apply the filters during data retrieval. This is especially beneficial for: + +- Data sources backed by formats that allow efficient filtering (e.g. key-value stores) +- APIs that support filtering (e.g. REST and GraphQL APIs) + +The data source receives the filters, decides which ones can be pushed down, and returns the remaining filters to Spark to be applied later. + +**Implementing Filter Pushdown** + +To enable filter pushdown in your Python Data Source, implement the ``pushFilters`` method in your ``DataSourceReader`` class: + +.. code-block:: python + + from pyspark.sql.datasource import EqualTo, Filter, GreaterThan, LessThan + + def pushFilters(self, filters: List[Filter]) -> Iterable[Filter]: + """ + Parameters + ---------- + filters : list of Filter objects + The AND of the filters that Spark would like to push down + + Returns + ------- + iterable of Filter objects + Filters that could not be pushed down and still need to be + evaluated by Spark + """ + # Process the filters and determine which ones can be handled by the data source + pushed = [] + for filter in filters: + if isinstance(filter, (EqualTo, GreaterThan, LessThan)): + pushed.append(filter) + # Check for other supported filter types... + else: + yield filter # Let Spark handle unsupported filters + + # Store the pushed filters for use in partitions() and read() methods + self.pushed_filters = pushed + +**Notes** + +pushFilters() is called only if there are filters available to push down. +If it is called, the call happens before partitions(). + +**Supported Filter Types** + +Spark supports pushing down the following filter types: + +.. list-table:: + :header-rows: 1 + + * - Filter Type + - Class + - SQL Equivalent + * - Equality + - ``EqualTo`` + - ``column = constant`` + * - Greater Than + - ``GreaterThan`` + - ``column > constant`` + * - Greater Than or Equal + - ``GreaterThanOrEqual`` + - ``column >= constant`` + * - Less Than + - ``LessThan`` + - ``column < constant`` + * - Less Than or Equal + - ``LessThanOrEqual`` + - ``column <= constant`` + * - IN list + - ``In`` + - ``column IN (constants)`` + * - IS NULL + - ``IsNull`` + - ``column IS NULL`` + * - IS NOT NULL + - ``IsNotNull`` + - ``column IS NOT NULL`` + * - String Contains + - ``StringContains`` + - ``column LIKE '%constant%'`` + * - String Starts With + - ``StringStartsWith`` + - ``column LIKE 'constant%'`` + * - String Ends With + - ``StringEndsWith`` + - ``column LIKE '%constant'`` + * - NOT + - ``Not`` + - ``NOT filter`` + +Only supported filters are passed to the ``pushFilters`` method. + +**Best Practices** + +1. **Handle what you can, pass back what you can't**: Implement pushdown for filter types that your data source can handle efficiently. Return the remaining filters for Spark to process. + +2. **Anticipate new filter types**: More filter types may be added in the future, so do not assume that you handled all possible filters in your implementation. + +3. **Use pushed filters throughout the execution**: Store pushed filters and respect them in both ``partitions()`` and ``read()``. + +4. **Test performance**: Compare query performance with and without filter pushdown to ensure it's providing the expected benefits. + Usage Notes ----------- diff --git a/python/pyspark/sql/datasource.py b/python/pyspark/sql/datasource.py index c9704ff9f2599..4ff48b2665218 100644 --- a/python/pyspark/sql/datasource.py +++ b/python/pyspark/sql/datasource.py @@ -551,6 +551,11 @@ def pushFilters(self, filters: List["Filter"]) -> Iterable["Filter"]: This method is allowed to modify `self`. The object must remain picklable. Modifications to `self` are visible to the `partitions()` and `read()` methods. + Notes + ----- + Configuration `spark.sql.python.filterPushdown.enabled` must be set to `true` + to implement this method. + Examples -------- Example filters and the resulting arguments passed to pushFilters: From 419da8732c53410ab0af0bd4093687a079df1ada Mon Sep 17 00:00:00 2001 From: wengh Date: Wed, 30 Apr 2025 20:46:42 -0400 Subject: [PATCH 3/7] address comments and change example to prime numbers source --- .../tutorial/sql/python_data_source.rst | 111 ++++++++++++++---- python/pyspark/sql/datasource.py | 5 - 2 files changed, 86 insertions(+), 30 deletions(-) diff --git a/python/docs/source/tutorial/sql/python_data_source.rst b/python/docs/source/tutorial/sql/python_data_source.rst index 7d070e626529b..0f6a3e4a83443 100644 --- a/python/docs/source/tutorial/sql/python_data_source.rst +++ b/python/docs/source/tutorial/sql/python_data_source.rst @@ -535,7 +535,7 @@ Filter Pushdown in Python Data Sources Filter pushdown is an optimization technique that allows data sources to handle filters natively, reducing the amount of data that needs to be transferred and processed by Spark. -The filter pushdown API is introduced in Spark 4.1, enabling DataSourceReader to selectively push down filters from the query to the source. +The filter pushdown API enables DataSourceReader to selectively push down filters from the query to the source. You must turn on the configuration ``spark.sql.python.filterPushdown.enabled`` to enable filter pushdown. @@ -554,32 +554,93 @@ To enable filter pushdown in your Python Data Source, implement the ``pushFilter .. code-block:: python - from pyspark.sql.datasource import EqualTo, Filter, GreaterThan, LessThan - - def pushFilters(self, filters: List[Filter]) -> Iterable[Filter]: + import math + from typing import Iterable, List + from pyspark.sql.datasource import ( + DataSource, + DataSourceReader, + EqualTo, + Filter, + GreaterThan, + LessThan, + GreaterThanOrEqual, + LessThanOrEqual, + ) + + + class PrimesDataSource(DataSource): """ - Parameters - ---------- - filters : list of Filter objects - The AND of the filters that Spark would like to push down - - Returns - ------- - iterable of Filter objects - Filters that could not be pushed down and still need to be - evaluated by Spark + A data source that enumerates prime numbers. """ - # Process the filters and determine which ones can be handled by the data source - pushed = [] - for filter in filters: - if isinstance(filter, (EqualTo, GreaterThan, LessThan)): - pushed.append(filter) - # Check for other supported filter types... - else: - yield filter # Let Spark handle unsupported filters - - # Store the pushed filters for use in partitions() and read() methods - self.pushed_filters = pushed + + @classmethod + def name(cls): + return "primes" + + def schema(self): + return "p int" + + def reader(self, schema: str): + return PrimesDataSourceReader(schema, self.options) + + + class PrimesDataSourceReader(DataSourceReader): + def __init__(self, schema, options): + self.schema: str = schema + self.options = options + self.lower_bound = 2 + self.upper_bound = math.inf + + def pushFilters(self, filters: List[Filter]) -> Iterable[Filter]: + """ + Parameters + ---------- + filters : list of Filter objects + The AND of the filters that Spark would like to push down + + Returns + ------- + iterable of Filter objects + Filters that could not be pushed down and still need to be + evaluated by Spark + """ + for filter in filters: + print(f"Got filter: {filter}") + if isinstance(filter, EqualTo): + self.lower_bound = max(self.lower_bound, filter.value) + self.upper_bound = min(self.upper_bound, filter.value) + elif isinstance(filter, GreaterThan): + self.lower_bound = max(self.lower_bound, filter.value + 1) + elif isinstance(filter, LessThan): + self.upper_bound = min(self.upper_bound, filter.value - 1) + elif isinstance(filter, GreaterThanOrEqual): + self.lower_bound = max(self.lower_bound, filter.value) + elif isinstance(filter, LessThanOrEqual): + self.upper_bound = min(self.upper_bound, filter.value) + else: + yield filter # Let Spark handle unsupported filters + + def read(self, partition): + # Use the pushed filters to filter data during read + num = self.lower_bound + while num <= self.upper_bound: + if self._is_prime(num): + yield [num] + num += 1 + + @staticmethod + def _is_prime(n: int) -> bool: + """Check if a number is prime.""" + if n < 2: + return False + for i in range(2, int(n**0.5) + 1): + if n % i == 0: + return False + return True + + # Register the data source + spark.dataSource.register(PrimesDataSource) + spark.read.format("primes").load().filter("2000 <= p and p < 2050").show() **Notes** diff --git a/python/pyspark/sql/datasource.py b/python/pyspark/sql/datasource.py index 4ff48b2665218..c9704ff9f2599 100644 --- a/python/pyspark/sql/datasource.py +++ b/python/pyspark/sql/datasource.py @@ -551,11 +551,6 @@ def pushFilters(self, filters: List["Filter"]) -> Iterable["Filter"]: This method is allowed to modify `self`. The object must remain picklable. Modifications to `self` are visible to the `partitions()` and `read()` methods. - Notes - ----- - Configuration `spark.sql.python.filterPushdown.enabled` must be set to `true` - to implement this method. - Examples -------- Example filters and the resulting arguments passed to pushFilters: From 4f66e93ed1542b2297f189f935ab304d9b32f507 Mon Sep 17 00:00:00 2001 From: wengh Date: Thu, 1 May 2025 23:34:59 -0400 Subject: [PATCH 4/7] shorter names --- .../tutorial/sql/python_data_source.rst | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/python/docs/source/tutorial/sql/python_data_source.rst b/python/docs/source/tutorial/sql/python_data_source.rst index 0f6a3e4a83443..af80d3c0821f0 100644 --- a/python/docs/source/tutorial/sql/python_data_source.rst +++ b/python/docs/source/tutorial/sql/python_data_source.rst @@ -588,8 +588,8 @@ To enable filter pushdown in your Python Data Source, implement the ``pushFilter def __init__(self, schema, options): self.schema: str = schema self.options = options - self.lower_bound = 2 - self.upper_bound = math.inf + self.min = 2 + self.max = math.inf def pushFilters(self, filters: List[Filter]) -> Iterable[Filter]: """ @@ -604,26 +604,27 @@ To enable filter pushdown in your Python Data Source, implement the ``pushFilter Filters that could not be pushed down and still need to be evaluated by Spark """ - for filter in filters: - print(f"Got filter: {filter}") - if isinstance(filter, EqualTo): - self.lower_bound = max(self.lower_bound, filter.value) - self.upper_bound = min(self.upper_bound, filter.value) - elif isinstance(filter, GreaterThan): - self.lower_bound = max(self.lower_bound, filter.value + 1) - elif isinstance(filter, LessThan): - self.upper_bound = min(self.upper_bound, filter.value - 1) - elif isinstance(filter, GreaterThanOrEqual): - self.lower_bound = max(self.lower_bound, filter.value) - elif isinstance(filter, LessThanOrEqual): - self.upper_bound = min(self.upper_bound, filter.value) + for f in filters: + print(f"Got filter: {f}") + # Handle constraints on the range of numbers + if isinstance(f, EqualTo): + self.min = max(self.min, f.value) + self.max = min(self.max, f.value) + elif isinstance(f, GreaterThan): + self.min = max(self.min, f.value + 1) + elif isinstance(f, LessThan): + self.max = min(self.max, f.value - 1) + elif isinstance(f, GreaterThanOrEqual): + self.min = max(self.min, f.value) + elif isinstance(f, LessThanOrEqual): + self.max = min(self.max, f.value) else: - yield filter # Let Spark handle unsupported filters + yield f # Let Spark handle unsupported filters def read(self, partition): # Use the pushed filters to filter data during read - num = self.lower_bound - while num <= self.upper_bound: + num = self.min + while num <= self.max: if self._is_prime(num): yield [num] num += 1 From cfa23d60196a4d03298b4a2af39b53c72bbcff0b Mon Sep 17 00:00:00 2001 From: wengh Date: Thu, 1 May 2025 23:42:06 -0400 Subject: [PATCH 5/7] add output --- .../source/tutorial/sql/python_data_source.rst | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/python/docs/source/tutorial/sql/python_data_source.rst b/python/docs/source/tutorial/sql/python_data_source.rst index af80d3c0821f0..4749a1bf2be2b 100644 --- a/python/docs/source/tutorial/sql/python_data_source.rst +++ b/python/docs/source/tutorial/sql/python_data_source.rst @@ -643,6 +643,20 @@ To enable filter pushdown in your Python Data Source, implement the ``pushFilter spark.dataSource.register(PrimesDataSource) spark.read.format("primes").load().filter("2000 <= p and p < 2050").show() + # Got filter: IsNotNull(attribute=('p',)) + # Got filter: GreaterThanOrEqual(attribute=('p',), value=2000) + # Got filter: LessThan(attribute=('p',), value=2050) + # +----+ + # | p| + # +----+ + # |2003| + # |2011| + # |2017| + # |2027| + # |2029| + # |2039| + # +----+ + **Notes** pushFilters() is called only if there are filters available to push down. From 0affaaeff004c39ea4b5108362437a72ddc2d1da Mon Sep 17 00:00:00 2001 From: wengh Date: Thu, 1 May 2025 23:58:29 -0400 Subject: [PATCH 6/7] clarify mutating state requirements for DataSource methods --- python/docs/source/tutorial/sql/python_data_source.rst | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/python/docs/source/tutorial/sql/python_data_source.rst b/python/docs/source/tutorial/sql/python_data_source.rst index 4749a1bf2be2b..72b7a851e5813 100644 --- a/python/docs/source/tutorial/sql/python_data_source.rst +++ b/python/docs/source/tutorial/sql/python_data_source.rst @@ -358,11 +358,15 @@ For library that are used inside a method, it must be imported inside the method Mutating State ~~~~~~~~~~~~~~ -Some methods such as DataSourceReader.read() and DataSourceReader.partitions() must be stateless. Changes to the object state made in these methods are not guaranteed to be visible or invisible to future invocations. +The following methods should not mutate internal state. Changes to the object state made in these methods are not guaranteed to be visible or invisible to future operations. -Other methods such as DataSource.schema() and DataSourceStreamReader.latestOffset() can be stateful. Changes to the object state made in these methods are visible to future invocations. +- DataSourceReader.partitions() +- DataSourceReader.read() +- DataSourceStreamReader.read() +- SimpleDataSourceStreamReader.readBetweenOffsets() +- All writer methods -Refer to the documentation of each method for more details. +All other methods such as DataSource.schema() and DataSourceStreamReader.latestOffset() can be stateful. Changes to the object state made in these methods are visible to future invocations. Using a Python Data Source -------------------------- From 1e94d4bb536c0117606ab9cb501609cda45b4db5 Mon Sep 17 00:00:00 2001 From: wengh Date: Fri, 2 May 2025 00:00:33 -0400 Subject: [PATCH 7/7] reword --- python/docs/source/tutorial/sql/python_data_source.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/docs/source/tutorial/sql/python_data_source.rst b/python/docs/source/tutorial/sql/python_data_source.rst index 72b7a851e5813..db9af1c54ddc8 100644 --- a/python/docs/source/tutorial/sql/python_data_source.rst +++ b/python/docs/source/tutorial/sql/python_data_source.rst @@ -358,7 +358,7 @@ For library that are used inside a method, it must be imported inside the method Mutating State ~~~~~~~~~~~~~~ -The following methods should not mutate internal state. Changes to the object state made in these methods are not guaranteed to be visible or invisible to future operations. +The following methods should not mutate internal state. Changes to the object state made in these methods are not guaranteed to be visible or invisible to future calls. - DataSourceReader.partitions() - DataSourceReader.read() @@ -366,7 +366,7 @@ The following methods should not mutate internal state. Changes to the object st - SimpleDataSourceStreamReader.readBetweenOffsets() - All writer methods -All other methods such as DataSource.schema() and DataSourceStreamReader.latestOffset() can be stateful. Changes to the object state made in these methods are visible to future invocations. +All other methods such as DataSource.schema() and DataSourceStreamReader.latestOffset() can be stateful. Changes to the object state made in these methods are visible to future calls. Using a Python Data Source --------------------------