Skip to content

Conversation

@rjzamora
Copy link
Member

@rjzamora rjzamora commented May 19, 2023

Supersedes #84

  • Implements __len__
  • Adds a new Lengths expression to return a tuple of partition lengths.
  • Adds a _lengths method to Expr to track "known" partition lengths.
  • Adds a _lengths property and _partitioning method to Expr. These are essentially mechanisms to track "known" partition lengths and partitioning information.
  • Adds new logic to ReadParquet to use collect and use parquet-metadata statistics to implement ReadParquet._lengths and ReadParquet._partitioning

@rjzamora rjzamora changed the title Simple statistics Use parquet statistics for length and partitioning information May 19, 2023
Return
------
partitioning: dict
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd welcome a conversation about this.

My initial thought is that it made sense to store some baseline information like ...

  1. row counts of each partition
  2. min/max values of each column in each partition

These are similar to what comes out of parquet. Then, when we wanted to ask something, we would consult that raw data.

This feels like we're now storing derivative values off of that data. This makes me slightly nervous because it opens the door to tracking lots of state. I would be more comfortable if we were to track the underlying state (counts, mins, maxes) and then decided to compute quantities like these on the fly. That feels more tightly scoped to me.

Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR does a few different things, and some of those "things" I am much more confident in than others.

  1. We add an optional _lengths attribute the Expr to store "known" partition lengths.
  2. We add an optional _partitioning method to Expr so that an expression can check if the underlying collection is partitioned by a specific set of columns (even if that column does not include an index with known divisions).
  3. We add logic to some Expr classes (mostly ReadParquet) to "lazily" collect the necessary statistics when _lengths or _partitioning information is requested.

The primary reason this PR is still marked as "draft" is that the current iteration will always attempt to go back and collect statistics in ReadParquet when _lengths or _partitioning are called (and the necessary statistics are missing). While it will always make sense to collect partition-length statistics in support of something like len(df), it may not always be the best idea to collect statistics. In fact, I'm already a bit uncomfortable with the fact that column-projection and predicate-pushdown optimizations currently require us to repeat the initial dataset processing, which can be slow on some systems (this is something I'd like to address separately).

Note that I also think the specific API can be improved, but the "eagerness" of the lazy-metadata collection feels like the most challenging short-term blocker.

What you seem to be uncomfortable with is the fact that we are not adding something like Expr._mins and Expr._maxes, but are instead exposing a method to provide more general (derivative) information about how the collection is partitioned. I'm very open to other approaches. My current proposal here was just the natural result of attempting to store mins/maxes, and finding that my personal attempt at doing so was not particularly clean or useful. In most cases, the original ReadParquet expression will not collect useful min/max statistics. When the expression does collect min/max statistics, the only reason we care about them is to tell us how/if the collection is partitioned. For this reason, I found it most natural to allow specific classes (like ReadParquet and Shuffle) to worry about what kinds of statistics they want to collect/track (if any).

I'll think a bit more about this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm already a bit uncomfortable with the fact that column-projection and predicate-pushdown optimizations currently require us to repeat the initial dataset processing, which can be slow on some systems (this is something I'd like to address separately).

I noticed this recently. I wonder if the parquet code could benefit from a module-level lru-cache

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you seem to be uncomfortable with is the fact that we are not adding something like Expr._mins and Expr._maxes, but are instead exposing a method to provide more general (derivative) information about how the collection is partitioned

Yeah, I'm comparing this to database world where you have a reference table which is the single point of truth (SPOT) and then views on that table. This feels like we're storing the views as concrete tables. Bad things tend to result from that behavior.

As an example. I could imagine future applications aside from sortedness. We've mentioned a couple of these including filtering / partition pruning and optimizations that are based on the values. I think that storing the underlying data is more future-proof.

I probably wouldn't have separate protocols for _maxes and _mins but maybe a protocol that includes _min_maxes or all column-based statistics (if they're likely to be consistent across all systems that provide this information (it might make sense to look at what Snowflake, Parquet, and Delta all provide, for example)).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about this a bit more, I'm planning to split this work into two distinct proposals: (1) Tracking and using partition-length statistics, and (2) tracking and using min/max statistics.

I'm expecting that we will be able to agree on a design for (1) a lot faster than (2).

I also expect (1) to be a bit more valuable than (2) in the short term. In my experience, it can be useful to know column mins/maxes immediately after IO. However, it would be much more valuable to have a _partitioning-like method/utility to tell us if a collection is partitioned by a given set of columns. I'd expect such a method to consult min/max statistics (if known), but the more-common case would be that the collection was recently shuffled/joined/grouped on the columns in question.

To summarize: I think storing/using length-based statistics is useful and easier to agree on in the short term, so I will probably focus on that first. I don't personally care much about min/max statistics unless they are in support of a _partitioning-like method. So, I'll probably hold off on that work until there is some consensus on what that API should look/behave like.

@rjzamora rjzamora changed the title Use parquet statistics for length and partitioning information Implement __len__ and leverage parquet statistics May 23, 2023
@rjzamora rjzamora marked this pull request as ready for review May 23, 2023 18:34
@rjzamora
Copy link
Member Author

@mrocklin - Let me know if you are concerned by any of the changes remaining in this PR. I agree with your off-line suggestion that we could also introduce another method like Expr._maxima to track min/max statistics. However, I'd like to tackle that in a follow-up.

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'm sorry to have this drag on for so long, but please bear with me here.

What if we don't create a new protocol on expressions here, but instead make a new expression, Lengths

class Lengths(Expr):
    _parameters = ["frame"]

    @property
    def _meta(self):
        return []

    def _simplify_down(self):
        if isinstance(self.frame, Elemwise):
            return Lengths(self.frame.operands[0])

        if isinstance(self.frame, ReadParquet):
            return Literal(self.frame._get_lengths())

    def _layer(self):
        return {self._name: [(self.frame._name, i): (len, (self.frame._name, i) for i in range(self.frame.npartitions]}

Does this give you what you want? If so, I like it because we use an existing extension mechanism (Operations/Expr subclasses) rather than make something new. I'm also ok adding new methods, but this seems like it might work and invent one fewer thing.

Comment on lines 349 to 353
_lengths = self.frame._lengths(force=True)
if _lengths:
return Literal(sum(_lengths))
elif isinstance(self.frame, Elemwise):
child = max(self.frame.dependencies(), key=lambda expr: expr.npartitions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we're doing a forced length computation before pushing through Elemwise. This seems unwise to me. I would probably do the cheap/free thing first of pushing through Elemwise operations before doing anything else.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general the force keyword opens up some questions I think. When do we use it, when don't we, when do we expect users to do this explicitly?

@rjzamora
Copy link
Member Author

Does this give you what you want?

I'm not completely sure. I was pushing on a similar design earlier on, but moved away from it when I started considering how I would need min/max statistics to be accessed/used.

To understand the limitations of an Expr-based approach, consider the case that the user calls set_index, and you want to be able to check if the collection is already sorted by the selected column. If we rely on something like MinMax(Expr), (I think) we would also need some other mechanism to tell us if computing this expression would require "real" data to be read into memory. That is, I would only want to use MinMax to optimize a set_index operation if I knew it would never require any real IO/computation.

Perhaps one reasonable approach would be to call these expressions something like MinMaxStats and LengthStats, and explicitly prohibit them from reading in "real" data. This way, optimization logic can use/compute a MinMaxStats expression to query for known statistics without risking any real/expensive IO. We could also introduce the Lengths expression you have suggested, but allow ReadParquet to replace Lengths with LengthStats in _simplify_up.

Overall, I think I'm saying that I like the idea of leveraging the Expr foundation to query/leverage statistics. However, I think it is important that we clearly distinguish "pure" statistics-scanning expressions in some way. What do you think?

@mrocklin
Copy link
Member

Yeah, so I think that this gets at my other question of "exactly what are the semantics around the force= keyword?" I agree generally with your framing.

Using the Expr approach in an operation like set_index we would probably do something like the following:

def set_index(self, column):
    minmaxes = self.minmax(column).optimize()
    if isinstance(minmaxes, Literal):  # 🎉
        ...  do something easy
    if hasattr(minmaxes.frame, "minmaxes"):
        ... consider calling this method
    ...

If we rely on something like MinMax(Expr), (I think) we would also need some other mechanism to tell us if computing this expression would require "real" data to be read into memory. That is, I would only want to use MinMax to optimize a set_index operation if I knew it would never require any real IO/computation.

Agreed. I think that that information is present in the optimized expression tree.

Comment on lines 348 to 354
def _simplify_down(self):
if isinstance(self.frame, Elemwise):
_lengths = Lengths(self.frame).optimize()
if isinstance(_lengths, Literal):
return Literal(sum(_lengths.value))
elif isinstance(self.frame, Elemwise):
child = max(self.frame.dependencies(), key=lambda expr: expr.npartitions)
return Len(child)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if there is a better way to design the interaction between Len and Lengths. It seemed reasonable to be to switch from Len to Lengths when we know that the Lengths expression can be optimized down to a Literal expression.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be that there are cases where we can move Len but not Lengths. For example:

df = dx.read_csv(...)
df = df.set_index("...")
len(df)

I'm not going to be able to pass Lengths through the set_index call, but I can pass Len through (the number of rows is the same after the full shufle). Because of this I think that they probably need to remain separate operations with separate optimization paths. I don't think that we can define one in terms of the other.

Also, I suspect that Len is likely to be more common than Lengths, so I'm disinclined to have it inherit any weaknesses from the other.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to keep the Len(Elemwise) branch of this on top. Otherwise we're constructing a Lengths object and optimizing it every pass through optimizations (and there are likely to be several of these).

In principle, I wouldn't mind removing Lengths from this entirely, and instead have a ReadParquet._simplify_up(Len) case which is similar to but simpler than the Lengths case.


def _simplify_down(self):
if isinstance(self.frame, Elemwise):
return Lengths(self.frame.operands[0])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was simplifying things before. You'll have to watch out for situations like x.sum() + x. I think that if you look at the implementation for Len we handle this well.

Comment on lines 348 to 354
def _simplify_down(self):
if isinstance(self.frame, Elemwise):
_lengths = Lengths(self.frame).optimize()
if isinstance(_lengths, Literal):
return Literal(sum(_lengths.value))
elif isinstance(self.frame, Elemwise):
child = max(self.frame.dependencies(), key=lambda expr: expr.npartitions)
return Len(child)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be that there are cases where we can move Len but not Lengths. For example:

df = dx.read_csv(...)
df = df.set_index("...")
len(df)

I'm not going to be able to pass Lengths through the set_index call, but I can pass Len through (the number of rows is the same after the full shufle). Because of this I think that they probably need to remain separate operations with separate optimization paths. I don't think that we can define one in terms of the other.

Also, I suspect that Len is likely to be more common than Lengths, so I'm disinclined to have it inherit any weaknesses from the other.

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment. Generally I'm good to merge.

Does this satisfy your needs in projects like Merlin?

Comment on lines 348 to 354
def _simplify_down(self):
if isinstance(self.frame, Elemwise):
_lengths = Lengths(self.frame).optimize()
if isinstance(_lengths, Literal):
return Literal(sum(_lengths.value))
elif isinstance(self.frame, Elemwise):
child = max(self.frame.dependencies(), key=lambda expr: expr.npartitions)
return Len(child)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to keep the Len(Elemwise) branch of this on top. Otherwise we're constructing a Lengths object and optimizing it every pass through optimizations (and there are likely to be several of these).

In principle, I wouldn't mind removing Lengths from this entirely, and instead have a ReadParquet._simplify_up(Len) case which is similar to but simpler than the Lengths case.

@rjzamora
Copy link
Member Author

Does this satisfy your needs in projects like Merlin?

Yes, this optimized len approach should satisfy the needs of Merlin (and other users who want to stream partitions into pt/tf-based data-loaders).

@rjzamora rjzamora merged commit 75b8eb2 into dask:main May 31, 2023
@rjzamora rjzamora deleted the simple-statistics branch May 31, 2023 14:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants