Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
ffe4ca1
[HUDI-9766] Support for show_timeline procedure with time-based filte…
vamshikrishnakyatham Aug 29, 2025
1827abc
[HUDI-9766] Support for show_timeline procedure with time-based filte…
vamshikrishnakyatham Aug 29, 2025
09594b3
Merge remote-tracking branch 'upstream/master' into HUDI-9766-Support…
vamshikrishnakyatham Aug 29, 2025
afd7d6b
[HUDI-9766] Support for show_timeline procedure with time-based filte…
vamshikrishnakyatham Aug 29, 2025
521ed76
optimization on driver for reading instants
vamshikrishnakyatham Sep 19, 2025
a0d33b5
Merge branch 'master' of ssh://github.com/apache/hudi into HUDI-9766-…
Nov 8, 2025
c0a4a97
Merge branch 'master' of ssh://github.com/apache/hudi into pavi-hudi-…
Nov 13, 2025
45aec77
HUDI-9766 - Added instant loader for archived timeline v2
Nov 17, 2025
5c0a7ae
HUDI-9766 - Added instant loader for archived timeline v2 and added t…
Nov 18, 2025
77d9162
HUDI-9766 - populating timestamps for achived timeline entries from H…
Nov 19, 2025
3d71552
HUDI-9766 - using existing methods for filtering time range instants …
Nov 19, 2025
c2daa7a
HUDI-9766 - show_timeline new tests for start and end time
Nov 19, 2025
47eee42
HUDI-9766 - show_timeline - addressed PR comments, bug fixes around a…
Nov 21, 2025
3ce75dd
HUDI-9766 - show_timeline new tests for start and end time
Nov 21, 2025
3bd5aa3
HUDI-9766 - show_timeline - addressed PR comments and added extensive…
Nov 25, 2025
81be85d
HUDI-9766 - show_timeline - timeline loaders extra check removal and …
Nov 25, 2025
f97ff88
Merge branch 'master' of ssh://github.com/apache/hudi into pavi-hudi-…
Nov 25, 2025
0f7d5c8
HUDI-9766 - show_timeline - bug fix for loading based on start and en…
Nov 25, 2025
bc8da37
HUDI-9766 - show_timeline - removed instantloaderlimit with func over…
Nov 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,16 @@ public interface HoodieArchivedTimeline extends HoodieTimeline {

void loadCompletedInstantDetailsInMemory();

void loadCompletedInstantDetailsInMemory(String startTs, String endTs);

void loadCompletedInstantDetailsInMemory(int limit);

void loadCompactionDetailsInMemory(String compactionInstantTime);

void loadCompactionDetailsInMemory(String startTs, String endTs);

void loadCompactionDetailsInMemory(int limit);

void clearInstantDetailsFromMemory(String instantTime);

void clearInstantDetailsFromMemory(String startTs, String endTs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,34 @@ record -> {
});
}

@Override
public void loadCompactionDetailsInMemory(int limit) {
loadInstantsWithLimit(limit, true,
record -> {
Object action = record.get(ACTION_STATE);
return record.get(ACTION_TYPE_KEY).toString().equals(HoodieTimeline.COMPACTION_ACTION)
&& (action == null || org.apache.hudi.common.table.timeline.HoodieInstant.State.INFLIGHT.toString().equals(action.toString()));
});
}

@Override
public void loadCompletedInstantDetailsInMemory(String startTs, String endTs) {
loadInstants(new ClosedClosedTimeRangeFilter(startTs, endTs), null, true,
record -> {
Object action = record.get(ACTION_STATE);
return action == null || org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED.toString().equals(action.toString());
});
}

@Override
public void loadCompletedInstantDetailsInMemory(int limit) {
loadInstantsWithLimit(limit, true,
record -> {
Object action = record.get(ACTION_STATE);
return action == null || org.apache.hudi.common.table.timeline.HoodieInstant.State.COMPLETED.toString().equals(action.toString());
});
}

@Override
public void clearInstantDetailsFromMemory(String instantTime) {
this.readCommits.remove(instantTime);
Expand Down Expand Up @@ -249,6 +277,12 @@ private List<HoodieInstant> loadInstants(HoodieArchivedTimeline.TimeRangeFilter
.stream().flatMap(Collection::stream).sorted().collect(Collectors.toList());
}

private void loadInstantsWithLimit(int limit, boolean loadInstantDetails, Function<GenericRecord, Boolean> commitsFilter) {
InstantsLoaderWithLimit loader = new InstantsLoaderWithLimit(loadInstantDetails, limit);
timelineLoader.loadInstants(
metaClient, null, Option.empty(), LoadMode.PLAN, commitsFilter, loader);
}

/**
* Callback to read instant details.
*/
Expand All @@ -274,6 +308,39 @@ public Map<String, List<HoodieInstant>> getInstantsInRangeCollected() {
}
}

public class InstantsLoaderWithLimit implements BiConsumer<String, GenericRecord> {
private final Map<String, List<HoodieInstant>> instantsInRange = new ConcurrentHashMap<>();
private final boolean loadInstantDetails;
private final int limit;
private volatile int loadedCount = 0;

private InstantsLoaderWithLimit(boolean loadInstantDetails, int limit) {
this.loadInstantDetails = loadInstantDetails;
this.limit = limit;
}

@Override
public void accept(String instantTime, GenericRecord record) {
if (loadedCount >= limit) {
return;
}
Option<HoodieInstant> instant = readCommit(instantTime, record, loadInstantDetails, null);
if (instant.isPresent()) {
synchronized (this) {
if (loadedCount < limit) {
instantsInRange.computeIfAbsent(instant.get().requestedTime(), s -> new ArrayList<>())
.add(instant.get());
loadedCount++;
}
}
}
}

public Map<String, List<HoodieInstant>> getInstantsInRangeCollected() {
return instantsInRange;
}
}

private Option<HoodieInstant> readCommit(String instantTime, GenericRecord record, boolean loadDetails,
TimeRangeFilter timeRangeFilter) {
final String action = record.get(ACTION_TYPE_KEY).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,24 @@ record -> record.get(ACTION_ARCHIVED_META_FIELD).toString().equals(COMMIT_ACTION
);
}

@Override
public void loadCompactionDetailsInMemory(int limit) {
loadInstantsWithLimit(limit, HoodieArchivedTimeline.LoadMode.PLAN,
record -> record.get(ACTION_ARCHIVED_META_FIELD).toString().equals(COMMIT_ACTION)
&& record.get(PLAN_ARCHIVED_META_FIELD) != null
);
}

@Override
public void loadCompletedInstantDetailsInMemory(String startTs, String endTs) {
loadInstants(new HoodieArchivedTimeline.TimeRangeFilter(startTs, endTs), HoodieArchivedTimeline.LoadMode.METADATA);
}

@Override
public void loadCompletedInstantDetailsInMemory(int limit) {
loadInstantsWithLimit(limit, HoodieArchivedTimeline.LoadMode.METADATA, r -> true);
}

@Override
public void clearInstantDetailsFromMemory(String instantTime) {
this.readCommits.remove(instantTime);
Expand Down Expand Up @@ -242,6 +260,47 @@ private List<HoodieInstant> loadInstants(
return result;
}

/**
* Loads instants with a limit on the number of instants to load.
* This is used for limit-based loading where we only want to load the N most recent instants.
*/
private void loadInstantsWithLimit(int limit, HoodieArchivedTimeline.LoadMode loadMode,
Function<GenericRecord, Boolean> commitsFilter) {
InstantsLoaderWithLimit loader = new InstantsLoaderWithLimit(limit, loadMode);
timelineLoader.loadInstants(metaClient, null, loadMode, commitsFilter, loader);
}

/**
* Callback to read instant details with a limit on the number of instants to load.
* Extends BiConsumer to be used as a callback in the timeline loader.
* The BiConsumer interface allows it to be passed as a lambda/function that accepts
* (instantTime, GenericRecord) pairs during the loading process.
*/
private class InstantsLoaderWithLimit implements BiConsumer<String, GenericRecord> {
private final int limit;
private final HoodieArchivedTimeline.LoadMode loadMode;
private volatile int loadedCount = 0;

private InstantsLoaderWithLimit(int limit, HoodieArchivedTimeline.LoadMode loadMode) {
this.limit = limit;
this.loadMode = loadMode;
}

@Override
public void accept(String instantTime, GenericRecord record) {
if (loadedCount >= limit) {
return;
}
Option<BiConsumer<String, GenericRecord>> instantDetailsConsumer = Option.ofNullable(getInstantDetailsFunc(loadMode));
readCommit(instantTime, record, instantDetailsConsumer);
synchronized (this) {
if (loadedCount < limit) {
loadedCount++;
}
}
}
}

@Override
public HoodieTimeline getWriteTimeline() {
// filter in-memory instants
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ object HoodieProcedures {
,(ShowCleansProcedure.NAME, ShowCleansProcedure.builder)
,(ShowCleansPartitionMetadataProcedure.NAME, ShowCleansPartitionMetadataProcedure.builder)
,(ShowCleansPlanProcedure.NAME, ShowCleansPlanProcedure.builder)
,(ShowTimelineProcedure.NAME, ShowTimelineProcedure.builder)
,(SetAuditLockProcedure.NAME, SetAuditLockProcedure.builder)
,(ShowAuditLockStatusProcedure.NAME, ShowAuditLockStatusProcedure.builder)
,(ValidateAuditLockProcedure.NAME, ValidateAuditLockProcedure.builder)
Expand Down
Loading