-
Notifications
You must be signed in to change notification settings - Fork 2.4k
feat: [HUDI-9766] Support for show_timeline Procedure with appropriate start and end time for both active and archive timelines #14261
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…ring and generic SQL filters on top
…ring and generic SQL filters on top
…-for-show_timeline-Procedure-with-appropriate-start-and-end-time-for-both-active-and-archive-timelines
…ring and generic SQL filters on top
…Support-for-show_timeline-Procedure-with-appropriate-start-and-end-time-for-both-active-and-archive-timelines
|
|
||
| // test show timeline after compaction with archived timeline | ||
| val timelineResultAfterCompactionDf = spark.sql(s"call show_timeline(table => '$tableName', showArchived => true)") | ||
| timelineResultAfterCompactionDf.show(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...on/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
Show resolved
Hide resolved
...on/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineV2.java
Outdated
Show resolved
Hide resolved
...on/src/main/java/org/apache/hudi/common/table/timeline/versioning/v1/ArchivedTimelineV1.java
Outdated
Show resolved
Hide resolved
|
|
||
| val activeRollbackInfoMap = getRolledBackInstantInfo(metaClient.getActiveTimeline, metaClient) | ||
| val archivedRollbackInfoMap = if (showArchived) { | ||
| val archivedTimeline = metaClient.getArchivedTimeline.reload() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line will triggers the whole timeline instants loading twice already, metaClient.getArchivedTimeline for one time with load mode as LoadMode.ACTION, and .reload() another time with load mode as LoadMode.ACTION, note that for V2, only the FULL mode supports reading the plans(rollback, cleaning and compaciton).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we just new an empty timeline there and invokes these APIs.
| val limitedInstants = if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) { | ||
| sortedInstants | ||
| } else { | ||
| sortedInstants.take(limit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we have a final limit on the timeline instants, do we stil need to add limit to the timeline API?
…rchived timeline partial loading (stop after limit / avoid loading entire archived timeline)
nsivabalan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sharing some feedback for now. will review the rest later today.
|
|
||
| void loadCompletedInstantDetailsInMemory(); | ||
|
|
||
| void loadCompletedInstantDetailsInMemory(String startTs, String endTs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have UTs for these?
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/StoppableRecordConsumer.java
Outdated
Show resolved
Hide resolved
| record -> { | ||
| Object action = record.get(ACTION_STATE); | ||
| return record.get(ACTION_TYPE_KEY).toString().equals(HoodieTimeline.COMPACTION_ACTION) | ||
| && (action == null || org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT.toString().equals(action.toString())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the action expected to be null for a completed entry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets add a comment
// Older files don't have action state set.
| metaClient, null, Option.empty(), LoadMode.PLAN, commitsFilter, loader); | ||
| List<HoodieInstant> collectedInstants = loader.getCollectedInstants(); | ||
| List<HoodieInstant> newInstants = collectedInstants.stream() | ||
| .filter(instant -> !getInstants().contains(instant)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do getInstants() once outside the loop and use it here.
currently, this will result in N array lists
| return instantsInRange.values() | ||
| .stream() | ||
| .flatMap(Collection::stream) | ||
| .sorted() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we need sorting based on instant times followed by states?
i.e REQEUSTED followed by INFLIGHT followed by COMPLETED
| entryList.sort(new ArchiveFileVersionComparator()); | ||
|
|
||
| for (StoragePathInfo fs : entryList) { | ||
| if (stoppable != null && stoppable.shouldStop()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets sync f2f on this. I have some questions/clarifications
| try { | ||
| return LSMTimeline.getMaxInstantTime(fileName); | ||
| } catch (Exception e) { | ||
| return ""; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why return EMPTY_STRING?
.../main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineLoaderV2.java
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
| filteredFiles.parallelStream().forEach(fileName -> { | ||
| if (stoppable != null && stoppable.shouldStop()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we are doing parallel stream, we can't guarantee latest N with limit N right?
there could be non-continuous entries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, we can't marry limit and parallel stream.
so, whenever we wanted to do limit, lets avoid parallel processing.
...on/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ArchivedTimelineV2.java
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
| filteredFiles.parallelStream().forEach(fileName -> { | ||
| if (stoppable != null && stoppable.shouldStop()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, we can't marry limit and parallel stream.
so, whenever we wanted to do limit, lets avoid parallel processing.
...park/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala
Outdated
Show resolved
Hide resolved
...park/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala
Show resolved
Hide resolved
...park/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala
Outdated
Show resolved
Hide resolved
| ) | ||
| val combinedEntries = (activeEntries ++ archivedEntries) | ||
| .sortWith((a, b) => { | ||
| val timePriorityOrder = a.getString(0).compareTo(b.getString(0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets use RequestedTimeBasedComparator and not come up w/ our own comparator.
| }) | ||
|
|
||
| if (startTime.trim.nonEmpty && endTime.trim.nonEmpty) { | ||
| combinedEntries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets try to apply code reuse wherever possible
|
|
||
| rollbackInstants.asScala.foreach { rollbackInstant => | ||
| try { | ||
| if (rollbackInstant.isInflight) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets do
if (!rollbackInstant.isComplete...) {
| private def getRollbackInfo(instant: HoodieInstant, timeline: HoodieTimeline, rollbackInfoMap: Map[String, List[String]], metaClient: HoodieTableMetaClient): String = { | ||
| try { | ||
| if (HoodieTimeline.ROLLBACK_ACTION.equalsIgnoreCase(instant.getAction)) { | ||
| if (instant.isInflight) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above
...park/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowTimelineProcedure.scala
Show resolved
Hide resolved
|
|
||
| import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase | ||
|
|
||
| class TestShowTimelineProcedure extends HoodieSparkSqlTestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets try to cover all cases listed below
both v1 and v2
- limit 50.
- both active and archived 20. where active contains 40.
- both active and archived 20, where active contains 10.
- start within active timeline.
- end within active timeline.
- start and end w/n active timeline.
- start in archived. but arch not explicitly enabled.
- "". archived enabled.
- start in archived and end in active timeline. archived not enabled.
- start in archived and end in active timeline. archived enabled.
- start and end in archived. arch not enabled.
- start and end in archived. arch enabled.
completed commits.
infight commits.
completed dc (mor)
inflight dc (mor)
clean commits.
clean inflight commits.
completed compaction.
infligth compaction.
completed replace commits. -> clustering
inflight replace commits. -> clustering
completed replace commits. -> insert overwrite
inflight replace commits. -> insert overwrite
completed rollback.
pending rollback.
In active timeline, we should have mix of inflight and completed. but in archived, we can only have completed and its fine.
| // This way, if all archived instants are older than the active timeline's max instant, | ||
| // the archived timeline will be empty and won't load anything, avoiding unnecessary loading. | ||
| // Instead of getArchivedTimeline() which loads with LoadMode.ACTION, we use the startTs | ||
| // constructor which loads with LoadMode.METADATA, and then load specific details (PLAN for compactions). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The LoadMode.METADATA does not include the plan, are you saying the LoadMode.PLAN or LoadMode.FULL ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even if we use the constructor ArchivedTimelineV2(HoodieTableMetaClient metaClient, String startTs), the whole timeline would still be loaded with filtering, a better way to avoid the eager loading is adding a new API in TimelineFactory.createArchivedTimeline(HoodieTableMetaClient metaClient, boolean loadingInstants), add in each ArchivedTimeline, we add a new constructor that accepts the meta client but does not load any instants by default.
Because in the use cases of this PR, we always want a lazy loading of archived timeline and the load is triggered as needed.
| } | ||
|
|
||
| // Sort files in reverse chronological order if needed (newest first for limit queries) | ||
| if (stoppable != null && stoppable.needsReverseOrder()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do not have good way to plugin the limit logic simply and clean, maybe we just add a separate method in ArchivedTimelineLoader.loadInstants with an explicit param StoppableRecordConsumer, the benefits:
- get rid of the null check and instance of check;
- always sort the files in reverse chronological order;
- read the files in single thread instead of in parallel.
Read with limit is somehow a range query instead of full scan, by doting this, we can freely plugin in the logic required for limit while still keep the basic scan query efficient and clean. We can always comeback to this for better abstraction when it is necessary.
| public void loadCompactionDetailsInMemory(int limit) { | ||
| loadInstantsWithLimit(limit, true, | ||
| record -> { | ||
| Object action = record.get(ACTION_STATE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here action should be state. We should rename it to be clear.
| public void loadCompletedInstantDetailsInMemory(int limit) { | ||
| loadInstantsWithLimit(limit, true, | ||
| record -> { | ||
| Object action = record.get(ACTION_STATE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here.

Describe the issue this Pull Request addresses
This PR introduces a new comprehensive
show_timelineprocedure for Hudi Spark SQL that provides detailed timeline information for all table operations. The procedure displays timeline instants including commits, deltacommits, compactions, clustering, cleaning, and rollback operations with support for both active and archived timelines and completed/pending state instants.Summary and Changelog
Comprehensive timeline view:
Shows all timeline instants with detailed metadata including state transitions (REQUESTED, INFLIGHT, COMPLETED)
Time-based filtering:
Support for
startTimeandendTimeparameters to filter results within specific time rangesArchive timeline support:
showArchivedparameter to include archived timeline data for complete historical viewGeneric SQL filtering:
filterparameter supporting SQL expressions for flexible result filteringRich metadata output:
Includes formatted timestamps, rollback information, and table type details
The procedure replaces multiple fragmented timeline-related procedures with a single unified interface that provides both pending and completed instant information with partition-specific metadata support.
Impact-related changelog details:
show_timelinewith parameters (table,path,limit,showArchived,filter,startTime,endTime)instant_time,action,state,requested_time,inflight_time,completed_time,timeline_type,rollback_infoImpact
User-facing Features:
Performance Impact:
Risk Level
Low
Verification performed
Documentation Update
show_timelineprocedure to Hudi Spark SQL procedures documentationContributor's checklist