-
Notifications
You must be signed in to change notification settings - Fork 2.4k
feat: [HUDI-9766] Support for show_timeline Procedure with appropriate start and end time for both active and archive timelines #14261
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…ring and generic SQL filters on top
…ring and generic SQL filters on top
…-for-show_timeline-Procedure-with-appropriate-start-and-end-time-for-both-active-and-archive-timelines
…ring and generic SQL filters on top
…Support-for-show_timeline-Procedure-with-appropriate-start-and-end-time-for-both-active-and-archive-timelines
...on/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
Outdated
Show resolved
Hide resolved
...on/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineV2.java
Outdated
Show resolved
Hide resolved
...on/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
Outdated
Show resolved
Hide resolved
...park/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala
Outdated
Show resolved
Hide resolved
| val limitedInstants = if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) { | ||
| sortedInstants | ||
| } else { | ||
| sortedInstants.take(limit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we have a final limit on the timeline instants, do we stil need to add limit to the timeline API?
…rchived timeline partial loading (stop after limit / avoid loading entire archived timeline)
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sharing some feedback for now. will review the rest later today.
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieArchivedTimeline.java
Show resolved
Hide resolved
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/StoppableRecordConsumer.java
Outdated
Show resolved
Hide resolved
...on/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
Outdated
Show resolved
Hide resolved
...on/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
Outdated
Show resolved
Hide resolved
...on/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineLoaderV1.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineLoaderV2.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineLoaderV2.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineLoaderV2.java
Outdated
Show resolved
Hide resolved
...on/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineV2.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineLoaderV2.java
Outdated
Show resolved
Hide resolved
...park/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala
Outdated
Show resolved
Hide resolved
...park/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala
Show resolved
Hide resolved
...park/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala
Outdated
Show resolved
Hide resolved
...park/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala
Outdated
Show resolved
Hide resolved
...park/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala
Outdated
Show resolved
Hide resolved
...park/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala
Outdated
Show resolved
Hide resolved
...park/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala
Outdated
Show resolved
Hide resolved
...park/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala
Show resolved
Hide resolved
|
|
||
| import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase | ||
|
|
||
| class TestShowTimelineProcedure extends HoodieSparkSqlTestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets try to cover all cases listed below
both v1 and v2
- limit 50.
- both active and archived 20. where active contains 40.
- both active and archived 20, where active contains 10.
- start within active timeline.
- end within active timeline.
- start and end w/n active timeline.
- start in archived. but arch not explicitly enabled.
- "". archived enabled.
- start in archived and end in active timeline. archived not enabled.
- start in archived and end in active timeline. archived enabled.
- start and end in archived. arch not enabled.
- start and end in archived. arch enabled.
completed commits.
infight commits.
completed dc (mor)
inflight dc (mor)
clean commits.
clean inflight commits.
completed compaction.
infligth compaction.
completed replace commits. -> clustering
inflight replace commits. -> clustering
completed replace commits. -> insert overwrite
inflight replace commits. -> insert overwrite
completed rollback.
pending rollback.
In active timeline, we should have mix of inflight and completed. but in archived, we can only have completed and its fine.
| // This way, if all archived instants are older than the active timeline's max instant, | ||
| // the archived timeline will be empty and won't load anything, avoiding unnecessary loading. | ||
| // Instead of getArchivedTimeline() which loads with LoadMode.ACTION, we use the startTs | ||
| // constructor which loads with LoadMode.METADATA, and then load specific details (PLAN for compactions). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The LoadMode.METADATA does not include the plan, are you saying the LoadMode.PLAN or LoadMode.FULL ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if we use the constructor ArchivedTimelineV2(HoodieTableMetaClient metaClient, String startTs), the whole timeline would still be loaded with filtering, a better way to avoid the eager loading is adding a new API in TimelineFactory.createArchivedTimeline(HoodieTableMetaClient metaClient, boolean loadingInstants), add in each ArchivedTimeline, we add a new constructor that accepts the meta client but does not load any instants by default.
Because in the use cases of this PR, we always want a lazy loading of archived timeline and the load is triggered as needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danny0405 - ArchivedTimelineV2(HoodieTableMetaClient metaClient, String startTs) - we are using the startTs as the firstInstantTime from active timeline. So all instants in archivedTimeline will be before this starting point. So, we won't be reading the records inside the files.
The archived files will skipped (based on start timestamp comparison of each file). Tested locally that we are not reading the records inside the file like before.
CC: @nsivabalan
Also, we are using empty archived timeline only in SQL procedure. currently we do not have a use case outside of this SQL procedure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The archived files will skipped (based on start timestamp comparison of each file). Tested locally that we are not reading the records inside the file like before.
For V2 this might be true, but not for V1?
| } | ||
|
|
||
| // Sort files in reverse chronological order if needed (newest first for limit queries) | ||
| if (stoppable != null && stoppable.needsReverseOrder()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do not have good way to plugin the limit logic simply and clean, maybe we just add a separate method in ArchivedTimelineLoader.loadInstants with an explicit param StoppableRecordConsumer, the benefits:
- get rid of the null check and instance of check;
- always sort the files in reverse chronological order;
- read the files in single thread instead of in parallel.
Read with limit is somehow a range query instead of full scan, by doting this, we can freely plugin in the logic required for limit while still keep the basic scan query efficient and clean. We can always comeback to this for better abstraction when it is necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the suggestion. My concern is by adding a separate method, we would be duplicating the same logic and additional maintenance overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense. Accepted the suggestion and made relevant changes
...on/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
Outdated
Show resolved
Hide resolved
...on/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed the comments and rebasing with master to resolve conflicts
…d times of archived timeline
.../main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineLoaderV1.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineLoaderV1.java
Outdated
Show resolved
Hide resolved
...on/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
Outdated
Show resolved
Hide resolved
| * Helper method to create a table with all types of commits for comprehensive testing. | ||
| * Creates: completed/inflight commits, deltacommits (MOR), clean, compaction, clustering, insert overwrite, rollback | ||
| */ | ||
| private def setupTableWithAllCommitTypes(tableName: String, tableLocation: String, tableType: String): Map[String, String] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
didn't I point you to test code where we prepare the timeline directly?
we should avoid doing end to end functional set up if not really required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refer to TestArchivedTimelineV1.createInstants() and createAdditionalInstants()
and TestArchivedTimelineV2.writeArchivedTimeline(...)
|
|
||
| // 2. Inflight commits - create and leave one inflight by interrupting | ||
| spark.sql(s"insert into $tableName values(4, 'a4', 40, 4000)") | ||
| Thread.sleep(100) // Small delay |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need the sleep in many places ?
|
|
||
| // Scenario 1: limit 50 | ||
| val limit50Result = spark.sql(s"call show_timeline(table => '$tableName', limit => 50)").collect() | ||
| assert(limit50Result.length <= 50, s"Scenario 1: Should have at most 50 entries, got ${limit50Result.length}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we validate the instant times as well.
would be much easier if we have prepared the timeline in test.
something to think about.
| result6.foreach { row => | ||
| val instantTime = row.getString(0) | ||
| assert(instantTime >= startTime && instantTime <= endTime, | ||
| s"Scenario 6: Entry $instantTime should be in range [$startTime, $endTime]") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets revisit all validations and make it tight
| val archivedEnd = archivedTimes.last | ||
| val result11 = spark.sql(s"call show_timeline(table => '$tableName', startTime => '$archivedStart', endTime => '$archivedEnd')").collect() | ||
| result11.foreach { row => | ||
| assert(row.getString(6) == "ACTIVE" || row.getString(0) >= archivedStart, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't result11 expected to be an empty list
| println(s"V1 COW test: Table version after downgrade: $versionAfter") | ||
| // // Verify table version was downgraded to SIX | ||
| // assert(versionAfter == HoodieTableVersion.SIX, | ||
| // s"V1 COW test: Table version should be SIX after downgrade, but got $versionAfter (was $versionBefore)") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why commented out ?
| s"V1 COW: Archived timeline should only have COMPLETED states, got: ${archivedStates.mkString(", ")}") | ||
| } | ||
|
|
||
| // Verify specific commit types exist for V1 COW |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do this for V1 only?
…loading with limit param
Describe the issue this Pull Request addresses
This PR introduces a new comprehensive
show_timelineprocedure for Hudi Spark SQL that provides detailed timeline information for all table operations. The procedure displays timeline instants including commits, deltacommits, compactions, clustering, cleaning, and rollback operations with support for both active and archived timelines and completed/pending state instants.Summary and Changelog
Comprehensive timeline view:
Shows all timeline instants with detailed metadata including state transitions (REQUESTED, INFLIGHT, COMPLETED)
Time-based filtering:
Support for
startTimeandendTimeparameters to filter results within specific time rangesArchive timeline support:
showArchivedparameter to include archived timeline data for complete historical viewGeneric SQL filtering:
filterparameter supporting SQL expressions for flexible result filteringRich metadata output:
Includes formatted timestamps, rollback information, and table type details
The procedure replaces multiple fragmented timeline-related procedures with a single unified interface that provides both pending and completed instant information with partition-specific metadata support.
Impact-related changelog details:
show_timelinewith parameters (table,path,limit,showArchived,filter,startTime,endTime)instant_time,action,state,requested_time,inflight_time,completed_time,timeline_type,rollback_infoImpact
User-facing Features:
Performance Impact:
Risk Level
Low
Verification performed
Documentation Update
show_timelineprocedure to Hudi Spark SQL procedures documentationContributor's checklist