Skip to content

Conversation

@Annaseli
Copy link
Contributor

@Annaseli Annaseli commented Nov 27, 2025

Closes #9731

Change Description

This PR introduces the infrastructure required for supporting async commit and merge operations.
Part of the full async commit/merge implementation in the Enterprise PR.

Changes

  1. Added four new API endpoints.
  2. Added an AsyncOperationsHandler interface.
  3. Added an OSS no-op implementation of the AsyncOperationsHandler interface.
  4. Updated the catalog protobuf to include the new async-related operations.
  5. Added a status_code field to the generic Task protobuf and refactored error handling to use it.

Not Included in This PR (will be handled in follow-up PRs)

  1. Integration tests.
  2. Handling of expired operations.
  3. Timeout handling.

Testing Details

Validated using the test_async_endpoints.sh script for sanity check (tested from the Enterprise branch).

@Annaseli Annaseli requested a review from a team November 27, 2025 01:18
@Annaseli Annaseli added the include-changelog PR description should be included in next release changelog label Nov 27, 2025
Copy link
Contributor

@Isan-Rivkin Isan-Rivkin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the feature is exciting!
I didn't do full functional review rather learned from the code your intentions :)

) (*MergeAsyncStatus, error)
}

type OSSAsyncOperationsHandler struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Rename to a meaningful name based on functionality rather than not location ( i.e NoopCommitHandler , DummyCommitHandler, AsyncCommitHandler )

@@ -0,0 +1,14 @@
package catalogerrors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a dedicated package for catalog errors?
Note: there are errors declated inside catalog package such as catalog/ErrImportClosed, if there is a reason for such package (?) we should be consistent with location.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this because I had cyclic dependency issues, but after talking with Barak, I’m updating the implementation and reverting this change.

@@ -0,0 +1,135 @@
package apierrors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why new package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because I had cyclic dependency issues, but after talking with Barak, I’m updating the implementation and reverting this change.

log.WithError(err).WithField("step", step.Name).Errorf("Catalog background task step failed")
task.Done = true
task.Error = err.Error()
apierrors.HandleAPIErrorCallback(ctx, log, nil, nil, err, func(w http.ResponseWriter, r *http.Request, code int, v interface{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like the wrong place to translate error into user facing api error.
It should be at the controller level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that we like to map what the controller will be doing at this point for the current clients, status code will determine the exact error instead of checking the error message.

Suggest to have a api error callback function that can be set on the catalog, this way the controller can set it's error handler func as the catalog error handler callback.
It will add some complexity when reading the code - but better reuse and handle future changes in the controller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

answered in a comment below.

google.protobuf.Timestamp updated_at = 3;
int64 progress = 4;
string error = 5;
string error_code = 6; // error classification code for HTTP status mapping
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need 2 error fields? I find this confusing.
Which error do I need to assert for? What do I expect there? Also the fact that it's just HTTP status code does not preserve information regardless what happened. If all we need is HTTP status code then just translate it at the controller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

answered in a comment below.

Annaseli and others added 4 commits November 27, 2025 13:44
Remove manually added comments from generated file.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
log.WithError(err).WithField("step", step.Name).Errorf("Catalog background task step failed")
task.Done = true
task.Error = err.Error()
apierrors.HandleAPIErrorCallback(ctx, log, nil, nil, err, func(w http.ResponseWriter, r *http.Request, code int, v interface{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that we like to map what the controller will be doing at this point for the current clients, status code will determine the exact error instead of checking the error message.

Suggest to have a api error callback function that can be set on the catalog, this way the controller can set it's error handler func as the catalog error handler callback.
It will add some complexity when reading the code - but better reuse and handle future changes in the controller.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assume that with api callback error these errors will return to their place

int64 progress = 4;
string error = 5;
string error_msg = 5;
string error_code = 6; // error classification code for HTTP status mapping
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Will go with status code as the value will match http status code that the original api would have return
  • Status code is an integer

task.Done = true
task.Error = err.Error()
apierrors.HandleAPIErrorCallback(ctx, log, nil, nil, err, func(w http.ResponseWriter, r *http.Request, code int, v interface{}) {
task.ErrorCode = fmt.Sprintf("%d", code)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff is just in ns, but I still prefer to use strconv.Itoa when it comes to int.

@Annaseli
Copy link
Contributor Author

@Isan-Rivkin

Regarding your comments:

In the sync flow, MergeIntoBranch / Commit call Catalog.Merge / Catalog.Commit, which call the Store.Merge and Store.Commit funcs. Errors like 409, 400, 412 reach the controller as proper error objects, allowing handleAPIError to classify them and return the correct HTTP status code.

But in the async flow, the controller submits a task via AsyncOperations.SubmitCommit and AsyncOperations.SubmitMerge, and the actual merge/commit happens later inside RunBackgroundTaskSteps().
Before my change, this function only stored in the kv the error msg:

task.Error = err.Error()

so the original Error() object (and therefore the correct status code) was lost. The async status endpoints had no way to classify the error.

I used HandleAPIErrorCallback inside RunBackgroundTaskSteps() to classify the error before losing the Error() object, and store: Task.ErrorCode, Task.ErrorMsg (renamed from Task.Error)
so the async status endpoints can return the correct HTTP status code.

Using the existing classification logic caused circular dependencies (catalog → api → catalog), so I moved it into a separate apierrors package.

After discussing with Barak, I’m switching to a callback-based approach, so I’ll revert the move and avoid the circular dependency entirely.

Copy link
Contributor

@itaigilo itaigilo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nicely done @Annaseli !

Most of my comments are either nit or about code clarity,
So blocking only since there are a bunch of them.

But other than than, looking good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this is not part of the catalog module?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved it there thanks (also in enterprise)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not pushed yet...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now it is :)

licenseManager license.Manager,
icebergSyncer icebergsync.Controller,
loginTokenProvider authentication.LoginTokenProvider,
asyncOperations catalog.AsyncOperationsHandler,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: asyncOpsHandler catalog.AsyncOperationsHandler?
Plus - placing it right after the actionsHandler makes more sense to me. This way, we start piling things up here.

return
}
ctx := r.Context()
c.LogAction(ctx, "create_commit_async", r, repository, branch, "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
c.LogAction(ctx, "create_commit_async", r, repository, branch, "")
c.LogAction(ctx, "commit_async", r, repository, branch, "")

return
}
ctx := r.Context()
c.LogAction(ctx, "create_commit_async_status", r, repository, branch, "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
c.LogAction(ctx, "create_commit_async_status", r, repository, branch, "")
c.LogAction(ctx, "commit_async_status", r, repository, branch, "")

metadata = body.Metadata.AdditionalProperties
}

taskID, err := c.AsyncOperations.SubmitCommit(ctx, repository, branch, body.Message, user.Committer(), metadata, body.Date, params.SourceMetarange, swag.BoolValue(body.AllowEmpty), graveler.WithForce(swag.BoolValue(body.Force)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some places it's called SubmitCommit, in others CreateCommit, or just CommitAsync.
Let's align to one of them (or max two), to avoid confusion.

Copy link
Contributor Author

@Annaseli Annaseli Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had "create_commit" only in
c.LogAction(ctx, "commit", r, repository, branch, "") inside the sync Commit function.
I updated it to c.LogAction(ctx, "commit", …) as well, even though that change wasn’t originally mine.

We don’t have a CreateCommit function, what we do have is CreateCommitAction, for example here:

if !c.authorize(w, r, permissions.Node{
    Permission: permissions.Permission{
        Action:   permissions.CreateCommitAction,
        Resource: permissions.BranchArn(repository, branch),
    },
}) {
    return
}

But it’s part of the permissions layer.

There is also CommitCreation, but that’s a Swagger schema used by the sync Commit flow, so I can’t rename it.

We also have CreateCommitRecord func in the controller but it's not my code.

In my async code, the only relevant parts are:

  • In the controller: CommitAsync
  • In the AsyncOperationsHandler interface: SubmitCommit and GetCommitStatus

Do you think SubmitCommit should be renamed to CreateCommit for consistency, or should we keep the current naming?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining the rational -
Putting it this way, I guess the naming you chose is good.

As noted below, I assume you should avoid renaming existing log names,
So better to align with these.

return nil
}

func NewAsyncOperationsHandler(_ *catalog.Catalog) catalog.AsyncOperationsHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: our convention for these factory methods is (or should be) Build*.
This way it's easier to spot these place where Enterprise is overriding the OSS.

Migrator Migrator
Collector stats.Collector
Actions actionsHandler
AsyncOpsHandler catalog.AsyncOperationsHandler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make it asyncOpsHandler?

func (c *Controller) handleAPIErrorCallback(ctx context.Context, w http.ResponseWriter, r *http.Request, err error, cb func(w http.ResponseWriter, r *http.Request, code int, v interface{})) bool {
// verify if request canceled even if there is no error, early exit point
if httputil.IsRequestCanceled(r) {
if r != nil && httputil.IsRequestCanceled(r) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When r can be nil?

Looks kinda weird that this change is introduced only now...

}
ctx := r.Context()
c.LogAction(ctx, "create_commit", r, repository, branch, "")
c.LogAction(ctx, "commit", r, repository, branch, "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a good idea to change log names for existing operations -
Please check if that's the case.

If it's problematic, then prefer keeping the current log name, and align newer logs according to it.

UGCPrepareInterval time.Duration
signingKey config.SecureString
// The API callback (handleAPIErrorCallback from the controller) is used to classify the error inside the RunBackgroundTaskSteps function.
APIErrorCB func(ctx context.Context, w http.ResponseWriter, r *http.Request, err error, cb func(w http.ResponseWriter, r *http.Request, code int, v interface{})) bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe extract func(ctx context.Context, w http.ResponseWriter, r *http.Request, err error, cb func(w http.ResponseWriter, r *http.Request, code int, v interface{})) bool into a type, for cleaner code?

@itaigilo itaigilo requested a review from N-o-Z November 28, 2025 09:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

include-changelog PR description should be included in next release changelog

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Build the infrastructure for the async commit and merge

4 participants