diff --git a/python/docs/source/tutorial/sql/python_data_source.rst b/python/docs/source/tutorial/sql/python_data_source.rst index 22b2a0b5f3c7b..db9af1c54ddc8 100644 --- a/python/docs/source/tutorial/sql/python_data_source.rst +++ b/python/docs/source/tutorial/sql/python_data_source.rst @@ -287,7 +287,7 @@ This is the same dummy streaming reader that generate 2 rows every batch impleme def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]: """ Takes start and end offset as input and read an iterator of data deterministically. - This is called whe query replay batches during restart or after failure. + This is called when replaying batches during restart or after failure. """ start_idx = start["offset"] end_idx = end["offset"] @@ -356,9 +356,23 @@ For library that are used inside a method, it must be imported inside the method from pyspark import TaskContext context = TaskContext.get() +Mutating State +~~~~~~~~~~~~~~ +The following methods should not mutate internal state. Changes to the object state made in these methods are not guaranteed to be visible or invisible to future calls. + +- DataSourceReader.partitions() +- DataSourceReader.read() +- DataSourceStreamReader.read() +- SimpleDataSourceStreamReader.readBetweenOffsets() +- All writer methods + +All other methods such as DataSource.schema() and DataSourceStreamReader.latestOffset() can be stateful. Changes to the object state made in these methods are visible to future calls. + Using a Python Data Source -------------------------- -**Use a Python Data Source in Batch Query** + +Use a Python Data Source in Batch Query +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ After defining your data source, it must be registered before usage. @@ -366,7 +380,8 @@ After defining your data source, it must be registered before usage. spark.dataSource.register(FakeDataSource) -**Read From a Python Data Source** +Read From a Python Data Source +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Read from the fake datasource with the default schema and options: @@ -412,7 +427,8 @@ Read from the fake datasource with a different number of rows: # | Douglas James|2007-01-18| 46226| Alabama| # +--------------+----------+-------+------------+ -**Write To a Python Data Source** +Write To a Python Data Source +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ To write data to a custom location, make sure that you specify the `mode()` clause. Supported modes are `append` and `overwrite`. @@ -424,7 +440,8 @@ To write data to a custom location, make sure that you specify the `mode()` clau # You can check the Spark log (standard error) to see the output of the write operation. # Total number of rows: 10 -**Use a Python Data Source in Streaming Query** +Use a Python Data Source in Streaming Query +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Once we register the python data source, we can also use it in streaming queries as source of readStream() or sink of writeStream() by passing short name or full name to format(). @@ -517,6 +534,197 @@ The following example demonstrates how to implement a basic Data Source using Ar df.show() +Filter Pushdown in Python Data Sources +-------------------------------------- + +Filter pushdown is an optimization technique that allows data sources to handle filters natively, reducing the amount of data that needs to be transferred and processed by Spark. + +The filter pushdown API enables DataSourceReader to selectively push down filters from the query to the source. + +You must turn on the configuration ``spark.sql.python.filterPushdown.enabled`` to enable filter pushdown. + +**How Filter Pushdown Works** + +When a query includes filter conditions, Spark can pass these filters to the data source implementation, which can then apply the filters during data retrieval. This is especially beneficial for: + +- Data sources backed by formats that allow efficient filtering (e.g. key-value stores) +- APIs that support filtering (e.g. REST and GraphQL APIs) + +The data source receives the filters, decides which ones can be pushed down, and returns the remaining filters to Spark to be applied later. + +**Implementing Filter Pushdown** + +To enable filter pushdown in your Python Data Source, implement the ``pushFilters`` method in your ``DataSourceReader`` class: + +.. code-block:: python + + import math + from typing import Iterable, List + from pyspark.sql.datasource import ( + DataSource, + DataSourceReader, + EqualTo, + Filter, + GreaterThan, + LessThan, + GreaterThanOrEqual, + LessThanOrEqual, + ) + + + class PrimesDataSource(DataSource): + """ + A data source that enumerates prime numbers. + """ + + @classmethod + def name(cls): + return "primes" + + def schema(self): + return "p int" + + def reader(self, schema: str): + return PrimesDataSourceReader(schema, self.options) + + + class PrimesDataSourceReader(DataSourceReader): + def __init__(self, schema, options): + self.schema: str = schema + self.options = options + self.min = 2 + self.max = math.inf + + def pushFilters(self, filters: List[Filter]) -> Iterable[Filter]: + """ + Parameters + ---------- + filters : list of Filter objects + The AND of the filters that Spark would like to push down + + Returns + ------- + iterable of Filter objects + Filters that could not be pushed down and still need to be + evaluated by Spark + """ + for f in filters: + print(f"Got filter: {f}") + # Handle constraints on the range of numbers + if isinstance(f, EqualTo): + self.min = max(self.min, f.value) + self.max = min(self.max, f.value) + elif isinstance(f, GreaterThan): + self.min = max(self.min, f.value + 1) + elif isinstance(f, LessThan): + self.max = min(self.max, f.value - 1) + elif isinstance(f, GreaterThanOrEqual): + self.min = max(self.min, f.value) + elif isinstance(f, LessThanOrEqual): + self.max = min(self.max, f.value) + else: + yield f # Let Spark handle unsupported filters + + def read(self, partition): + # Use the pushed filters to filter data during read + num = self.min + while num <= self.max: + if self._is_prime(num): + yield [num] + num += 1 + + @staticmethod + def _is_prime(n: int) -> bool: + """Check if a number is prime.""" + if n < 2: + return False + for i in range(2, int(n**0.5) + 1): + if n % i == 0: + return False + return True + + # Register the data source + spark.dataSource.register(PrimesDataSource) + spark.read.format("primes").load().filter("2000 <= p and p < 2050").show() + + # Got filter: IsNotNull(attribute=('p',)) + # Got filter: GreaterThanOrEqual(attribute=('p',), value=2000) + # Got filter: LessThan(attribute=('p',), value=2050) + # +----+ + # | p| + # +----+ + # |2003| + # |2011| + # |2017| + # |2027| + # |2029| + # |2039| + # +----+ + +**Notes** + +pushFilters() is called only if there are filters available to push down. +If it is called, the call happens before partitions(). + +**Supported Filter Types** + +Spark supports pushing down the following filter types: + +.. list-table:: + :header-rows: 1 + + * - Filter Type + - Class + - SQL Equivalent + * - Equality + - ``EqualTo`` + - ``column = constant`` + * - Greater Than + - ``GreaterThan`` + - ``column > constant`` + * - Greater Than or Equal + - ``GreaterThanOrEqual`` + - ``column >= constant`` + * - Less Than + - ``LessThan`` + - ``column < constant`` + * - Less Than or Equal + - ``LessThanOrEqual`` + - ``column <= constant`` + * - IN list + - ``In`` + - ``column IN (constants)`` + * - IS NULL + - ``IsNull`` + - ``column IS NULL`` + * - IS NOT NULL + - ``IsNotNull`` + - ``column IS NOT NULL`` + * - String Contains + - ``StringContains`` + - ``column LIKE '%constant%'`` + * - String Starts With + - ``StringStartsWith`` + - ``column LIKE 'constant%'`` + * - String Ends With + - ``StringEndsWith`` + - ``column LIKE '%constant'`` + * - NOT + - ``Not`` + - ``NOT filter`` + +Only supported filters are passed to the ``pushFilters`` method. + +**Best Practices** + +1. **Handle what you can, pass back what you can't**: Implement pushdown for filter types that your data source can handle efficiently. Return the remaining filters for Spark to process. + +2. **Anticipate new filter types**: More filter types may be added in the future, so do not assume that you handled all possible filters in your implementation. + +3. **Use pushed filters throughout the execution**: Store pushed filters and respect them in both ``partitions()`` and ``read()``. + +4. **Test performance**: Compare query performance with and without filter pushdown to ensure it's providing the expected benefits. + Usage Notes -----------