Row Lineage and Change Data Feed #4935
Replies: 4 comments 4 replies
-
|
@jackye1995 Thanks for summarizing and writing down the design about row lineage and CDF. I agree that distinguishing between the two concepts of row lineage and CDF is a good idea. Do we need to split multiple subtasks to support them? Or combine them with CDF, like my PR or your experiments PR the benefit is we can use CDF test row lineage. The disadvantage is the PR is huge.
This metadata information is also new for Lance? |
Beta Was this translation helpful? Give feedback.
-
|
What's more, shall we introduce API design to expose the ability of the row lineage, or just reuse the |
Beta Was this translation helpful? Give feedback.
-
|
Another aspect of CDF is the changed columns in a row. Traditionally, this requires getting the pre and post image of the row and then compare each column to know which column has changed in value. I think with Lance's 2-dimensional table concept and the row lineage feature we are doing here, there could be a better solution. Similar to how we track rows, we can also track creation time, update time and deletion time of a column at fragment level, and at CDF generation time, we can use the 2x2 information to produce accurate result. This is just a thought at this moment, put it here in case anyone would like to explore more. |
Beta Was this translation helpful? Give feedback.
-
I have two questions – could you reply when you're available?
|
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Me and Vino (@yanghua) have been designing the row lineage and change data feed feature for some time now (#4895, #4741), I think it is better that I write down the full design in my mind after experimenting with various designs.
Prior Arts
In general, row lineage and CDF are different features, and both Iceberg and Delta implement them separately. In fact, CDF dates before row lineage for both formats, and they are not very compatible.
However, I think there is a path to completely reuse row lineage to perform CDF. A key benefit of it is that, CDF becomes completely decoupled from the table versioning. This is important because you might want to consume CDF at any point of time (e.g. a downstream tries to refresh a MV based on it), and if the table maintenance process removes the old versions, the CDF should remain consumable.
Today Iceberg relies on historical versions present, which makes the coordination difficult. Also, using historical versions to construct CDF could be quite inefficient. Delta's CDF produces additional CDF files that are independent, but it mirrors all the writes essentially twice, the write performance impact is big.
Design
Row Lineage
We want to introduce the following row lineage columns:
_rowidwith stable row ID_row_created_at_version_row_last_updated_at_version(new in Lance)For 2, 3, in general we track these metadata in manfiest's fragment. These metadata are typically very small because most rows in a fragment are added and updated at the same time. We use a run-length encoded sequence to record them:
By tracking both created and updated version, we can now get the rows inserted or updated between v1 and v2 easily as:
Handling Deleted Rows
Similarly, we can also track
_row_deleted_at_version. However, we face 2 challenges:For 1, we can introduce a mode to query the deleted rows. So when this mode is enabled, we will not remove the rows with delete marker, but instead show it with a non-null
_row_deleted_at_version. For this design, let's say in SQL we have this special system tabletable$deleted_rowswhich behaves like it.For 2, we can introduce a system index, let's call it the
deleted_rows_index. When either case happens in 2, we will update thedeleted_rows_indexwith the pointers to the fragments that have those deleted rows. This is pretty much similar to how we already record an extra fragment reuse index during compaction. And similar to the fragment reuse index, this index can be maintained independently to fulfill the business need. For example, a table can declare that it provides CDF for the past N versions for X days as a service level agreement, without the need to worry about how the table maintenance process is going on to trim old table versions.Then going back to 1, a
SELECT * FROM table$deleted_rows WHERE _row_deleted_at_version IS NOT NULLbecomes basically combining all the rows in thedeleted_rows_indexplus the rows in the table that are marked as deleted in deletion vectors.And we can now answer the question of "what rows are deleted between v1 and v2" by:
Constructing CDF
Now with these 2 prerequisites, we can now construct the full CDF between any 2 versions. Firstly, we will union the results in the previous 2 steps. Then, for each row ID, we need to deal with the situation that it contains either:
row_id, version, operation, pre_image, post_image, a delete is the pre-image of its next delete or update.The key is that, all these can be described as SQL transformation, which means we can now just run a DataFusion SQL to get the CDF:
Beta Was this translation helpful? Give feedback.
All reactions