diff --git a/dask_expr/_collection.py b/dask_expr/_collection.py index a87e2849e..37548d5d7 100644 --- a/dask_expr/_collection.py +++ b/dask_expr/_collection.py @@ -553,6 +553,19 @@ def pprint(self): def dask(self): return self.__dask_graph__() + def exec(self, simplify=True): + """Directly execute with the backend DataFrame library + + WARNING: This is an experimental feature. Use at your own risk. + + This function will NOT convert the expression to a task graph + and execute with dask. Instead, the backend library will be + used to execute the logic defined by the ``Expr.__exec__`` + protocols directly. + """ + out = self.simplify() if simplify else self + return out.expr.__exec__() + def __dask_graph__(self): out = self.expr out = out.lower_completely() diff --git a/dask_expr/_core.py b/dask_expr/_core.py index 0eded2505..6b350ff88 100644 --- a/dask_expr/_core.py +++ b/dask_expr/_core.py @@ -61,6 +61,11 @@ def __new__(cls, *args, **kwargs): Expr._instances[_name] = inst return inst + def __exec__(self): + raise NotImplementedError( + f"Backend exec is not yet supported for {type(self)}." + ) + def _tune_down(self): return None diff --git a/dask_expr/_expr.py b/dask_expr/_expr.py index 9aaea9b1d..beaaf81a7 100644 --- a/dask_expr/_expr.py +++ b/dask_expr/_expr.py @@ -495,6 +495,10 @@ def _meta(self): args = [op._meta if isinstance(op, Expr) else op for op in self._args] return self.operation(*args, **self._kwargs) + def __exec__(self): + args = [op.__exec__() if isinstance(op, Expr) else op for op in self._args] + return self.operation(*args, **self._kwargs) + @functools.cached_property def _kwargs(self) -> dict: if self._keyword_only: diff --git a/dask_expr/_groupby.py b/dask_expr/_groupby.py index 129b03623..fff8eec2a 100644 --- a/dask_expr/_groupby.py +++ b/dask_expr/_groupby.py @@ -368,6 +368,15 @@ class GroupbyAggregationBase(GroupByApplyConcatApply, GroupByBase): "_slice": None, } + def __exec__(self): + frame = self.frame.__exec__() + kwargs = { + "sort": self.sort, + **_as_dict("observed", self.observed), + **_as_dict("dropna", self.dropna), + } + return frame.groupby(self.by, **kwargs).aggregate(self.arg) + @functools.cached_property def spec(self): # Converts the `arg` operand into specific diff --git a/dask_expr/_merge.py b/dask_expr/_merge.py index 4c4fb69d0..b543866ec 100644 --- a/dask_expr/_merge.py +++ b/dask_expr/_merge.py @@ -195,6 +195,11 @@ def _meta(self): kwargs["how"] = "left" return make_meta(left.merge(right, **kwargs)) + def __exec__(self): + left = self.left.__exec__() + right = self.right.__exec__() + return left.merge(right, **self.kwargs) + @functools.cached_property def _npartitions(self): if self.operand("_npartitions") is not None: diff --git a/dask_expr/_shuffle.py b/dask_expr/_shuffle.py index e7e9804cc..be532c858 100644 --- a/dask_expr/_shuffle.py +++ b/dask_expr/_shuffle.py @@ -815,6 +815,14 @@ def _meta(self): other = self._other return self.frame._meta.set_index(other, drop=self.drop) + def __exec__(self): + frame = self.frame.__exec__() + if isinstance(self._other, Expr): + other = self._other.__exec__() + else: + other = self._other + return frame.set_index(other, drop=self.drop) + @property def _divisions_column(self): return self.other @@ -996,6 +1004,10 @@ def sort_function_kwargs(self): def _meta(self): return self.frame._meta + def __exec__(self): + frame = self.frame.__exec__() + return self.sort_function(frame, **self.sort_function_kwargs) + @functools.cached_property def _meta_by_dtype(self): dtype = self._meta.dtypes[self.by] diff --git a/dask_expr/io/io.py b/dask_expr/io/io.py index 1dc82f597..0d3727f8a 100644 --- a/dask_expr/io/io.py +++ b/dask_expr/io/io.py @@ -407,6 +407,12 @@ def _meta(self): return meta[self.columns[0]] if self._series else meta[self.columns] return meta + def __exec__(self): + pdf = self.operand("frame")._data + if self.columns: + return pdf[self.columns[0]] if self._series else pdf[self.columns] + return pdf + @functools.cached_property def columns(self): columns_operand = self.operand("columns") diff --git a/dask_expr/tests/test_collection.py b/dask_expr/tests/test_collection.py index f59559eef..51202f999 100644 --- a/dask_expr/tests/test_collection.py +++ b/dask_expr/tests/test_collection.py @@ -2665,3 +2665,11 @@ def test_empty_from_pandas_projection(): df["foo"] = from_pandas(foo, npartitions=1) pdf["foo"] = foo assert_eq(df["foo"], pdf["foo"]) + + +def test_exec(): + pdf = pd.DataFrame({"a": [1, 2, 3, 4, 5, 6], "b": 1, "c": 2}) + df = from_pandas(pdf.copy()) + result = (df + 1).sort_values("a")["a"] + result_pd = (pdf + 1).sort_values("a")["a"] + assert_eq(result.exec(), result_pd)