Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
13b828f
start experimenting with parquet statistics
rjzamora May 15, 2023
f5f4e19
Merge remote-tracking branch 'upstream/main' into pq-statistics-len
rjzamora May 15, 2023
990ba4c
adopt parts of #40
rjzamora May 16, 2023
1c62f4c
experimenting with dedicated Metadata class structure
rjzamora May 16, 2023
afd59d7
add missing file
rjzamora May 16, 2023
8302305
go back to and remove sub-class for now
rjzamora May 16, 2023
a3c5f2c
add parquet test
rjzamora May 16, 2023
cbced80
use assume vs inherit
rjzamora May 16, 2023
5fe5862
use assume vs inherit
rjzamora May 16, 2023
b0946f8
split test
rjzamora May 16, 2023
bfd8710
fix doc-string
rjzamora May 16, 2023
2d343c7
fix typos
rjzamora May 16, 2023
aa27c96
Merge remote-tracking branch 'upstream/main' into pq-statistics-len
rjzamora May 17, 2023
4ce604d
use _lengths ILO statistics
rjzamora May 17, 2023
4ad6fb2
start pushing on _column_statistics
rjzamora May 18, 2023
d5e93a4
add _collect_statistics machinery to ReadParquet
rjzamora May 18, 2023
7b137c5
move utilities out of class body
rjzamora May 18, 2023
f6823d1
introduce _partitioning
rjzamora May 19, 2023
e600ea1
add simple test coverage for _partitions
rjzamora May 19, 2023
1dbfb18
improve test and fix bug
rjzamora May 19, 2023
5020657
Merge remote-tracking branch 'upstream/main' into simple-statistics
rjzamora May 19, 2023
423cfcb
remove leftover
rjzamora May 19, 2023
58ebf5a
fix parquet len test
rjzamora May 19, 2023
5790fb1
fix calculate_divisions default
rjzamora May 22, 2023
4be0221
Merge remote-tracking branch 'upstream/main' into simple-statistics
rjzamora May 23, 2023
cc01ebb
strip out _partitioning changes
rjzamora May 23, 2023
0345d19
missing calculate_divisions default
rjzamora May 23, 2023
7052a26
move _lengths to a method with force option
rjzamora May 23, 2023
e26d6cd
cache pd lengths
rjzamora May 23, 2023
5c376b9
missing annotations import
rjzamora May 23, 2023
cd6a5d6
Merge remote-tracking branch 'upstream/main' into simple-statistics
rjzamora May 24, 2023
62fbcfa
Merge remote-tracking branch 'upstream/main' into HEAD
rjzamora May 30, 2023
253cfeb
use Lengths
rjzamora May 30, 2023
1318219
Merge remote-tracking branch 'upstream/main' into simple-statistics
rjzamora May 30, 2023
32e4f94
partial fixup
rjzamora May 30, 2023
bd5395a
improve testing
rjzamora May 30, 2023
be4af18
cleanup
rjzamora May 31, 2023
47ff1d3
remove _len for now
rjzamora May 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dask_expr/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dask_expr.merge import Merge
from dask_expr.reductions import (
DropDuplicates,
Len,
MemoryUsageFrame,
MemoryUsageIndex,
NLargest,
Expand Down Expand Up @@ -85,6 +86,9 @@ def _meta(self):
def size(self):
return new_collection(self.expr.size)

def __len__(self):
return new_collection(Len(self.expr)).compute()

@property
def nbytes(self):
raise NotImplementedError("nbytes is not implemented on DataFrame")
Expand Down
45 changes: 45 additions & 0 deletions dask_expr/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
is_dataframe_like,
is_index_like,
is_series_like,
make_meta,
)
from dask.dataframe.dispatch import meta_nonempty
from dask.utils import M, apply, funcname, import_required, is_arraylike
Expand Down Expand Up @@ -636,6 +637,23 @@ def visualize(self, filename="dask-expr.svg", format=None, **kwargs):
return g


class Literal(Expr):
"""Represent a literal (known) value as an `Expr`"""

_parameters = ["value"]

def _divisions(self):
return (None, None)

@property
def _meta(self):
return make_meta(self.value)

def _task(self, index: int):
assert index == 0
return self.value


class Blockwise(Expr):
"""Super-class for block-wise operations

Expand Down Expand Up @@ -1031,6 +1049,33 @@ def _task(self, index: int):
)


class Lengths(Expr):
"""Returns a tuple of partition lengths"""

_parameters = ["frame"]

@property
def _meta(self):
return tuple()

def _divisions(self):
return (None, None)

def _simplify_down(self):
if isinstance(self.frame, Elemwise):
child = max(self.frame.dependencies(), key=lambda expr: expr.npartitions)
return Lengths(child)

def _layer(self):
name = "part-" + self._name
dsk = {
(name, i): (len, (self.frame._name, i))
for i in range(self.frame.npartitions)
}
dsk[(self._name, 0)] = (tuple, list(dsk.keys()))
return dsk


class ResetIndex(Elemwise):
"""Reset the index of a Series or DataFrame"""

Expand Down
27 changes: 26 additions & 1 deletion dask_expr/io/io.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from __future__ import annotations

import functools
import math

from dask.dataframe.io.io import sorted_division_locations

from dask_expr.expr import Blockwise, Expr, PartitionsFiltered
from dask_expr.expr import Blockwise, Expr, Lengths, Literal, PartitionsFiltered
from dask_expr.reductions import Len


class IO(Expr):
Expand Down Expand Up @@ -44,6 +47,7 @@ class FromPandas(PartitionsFiltered, BlockwiseIO):

_parameters = ["frame", "npartitions", "sort", "_partitions"]
_defaults = {"npartitions": 1, "sort": True, "_partitions": None}
_pd_length_stats = None

@property
def _meta(self):
Expand All @@ -68,6 +72,27 @@ def _divisions_and_locations(self):
divisions = (None,) * len(locations)
return divisions, locations

def _get_lengths(self) -> tuple | None:
if self._pd_length_stats is None:
locations = self._locations()
self._pd_length_stats = tuple(
offset - locations[i]
for i, offset in enumerate(locations[1:])
if not self._filtered or i in self._partitions
)
return self._pd_length_stats

def _simplify_up(self, parent):
if isinstance(parent, Lengths):
_lengths = self._get_lengths()
if _lengths:
return Literal(_lengths)

if isinstance(parent, Len):
_lengths = self._get_lengths()
if _lengths:
return Literal(sum(_lengths))

def _divisions(self):
return self._divisions_and_locations[0]

Expand Down
Loading