Skip to content

PLUGIN-1823: Retrying all SQLTransientExceptions #597

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

sgarg-CS
Copy link
Contributor

@sgarg-CS sgarg-CS commented May 16, 2025

PLUGIN-1823

Add Failsafe Retry poilcy to all the places in the database-plugins where SQLTransientException could be thrown.

Added three new properties (hidden from UI)

  • Initial Retry Duration (Default: 5sec)
  • Max Retry Duration (Default: 80 sec)
  • Max Retry Count (Default: 5)

Copy link

google-cla bot commented May 16, 2025

Thanks for your pull request! It looks like this may be your first contribution to a Google open source project. Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA).

View this failed invocation of the CLA check for more information.

For the most up to date status, view the checks section at the bottom of the pull request.

@psainics psainics force-pushed the patch/plugin-1823 branch from 7fcb0a0 to 9da75b8 Compare May 19, 2025 09:01
@psainics psainics added the build label May 19, 2025
@sgarg-CS sgarg-CS force-pushed the patch/plugin-1823 branch from 9da75b8 to ac813f0 Compare May 26, 2025 05:28
@sgarg-CS sgarg-CS added build and removed build labels May 27, 2025
@sgarg-CS sgarg-CS requested a review from itsankit-google May 30, 2025 04:56
Copy link
Member

@itsankit-google itsankit-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this looks everything is getting wrapped within Failsafe where we might end up with having nested level retries, we need to ensure we add retries only where we are actually interacting with JDBC client and not top level functions.

For example adding retries to DriverManager.getConnection(connectionString, connectionProperties) makes sense because you are actually interacting with the source db but adding retries to whole loadSchema(Connection connection, String query) do not makes sense we need to be careful while adding such retries.

@itsankit-google
Copy link
Member

Please note E2E should not be modified and not fail with these changes. Otherwise, we have done something wrong which does not give expected failure messages.

Copy link
Member

@itsankit-google itsankit-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see some level of duplication in both AbstractDBSource & AbstractDBSink, can we please move it to the common AbstractDBUtil class?

Copy link
Member

@itsankit-google itsankit-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public final class RetryUtils {

  public static Connection createConnectionWithRetry(RetryPolicy<?> retryPolicy, String connectionString,
                                                     Properties connectionProperties, String externalDocumentationLink) throws Exception {
    try {
      return Failsafe.with(retryPolicy).get(() ->
        DriverManager.getConnection(connectionString, connectionProperties)
      );
    } catch (Exception e) {
      throw unwrapFailsafeException(e, externalDocumentationLink);
    }
  }

  public static Statement createStatementWithRetry(RetryPolicy<?> retryPolicy,
                                                   Connection connection, String externalDocumentationLink) throws Exception {
    try {
      return Failsafe.with(retryPolicy).get(connection::createStatement);
    } catch (Exception e) {
      throw unwrapFailsafeException(e, externalDocumentationLink);
    }
  }

  public static PreparedStatement prepareStatementWithRetry(RetryPolicy<?> retryPolicy,
                                                            Connection connection,
                                                            String sqlQuery, String externalDocumentationLink) throws Exception {
    try {
      return Failsafe.with(retryPolicy).get(() ->
        connection.prepareStatement(sqlQuery)
      );
    } catch (Exception e) {
      throw unwrapFailsafeException(e, externalDocumentationLink);
    }
  }

 public static ResultSet executeWithRetry(RetryPolicy<?> retryPolicy,
                                                            Connection connection,
                                                            String sqlQuery, String externalDocumentationLink) throws Exception {
        try {
            return Failsafe.with(retryPolicy).get(() -> connection.createStatement().executeQuery(sqlQuery));
        } catch (Exception e) {
            throw unwrapFailsafeException(e, externalDocumentationLink);
        }
    }

 private static Exception unwrapFailsafeException(Exception e) {
    if (e instanceof FailsafeException && e.getCause() instanceof Exception) {
        if (e instanceOf SQLException) {
           return programFailureException(e, externalDocumentationLink);
        } else {
          return (Exception) e.getCause();
       }
    }
    return e;
  }
  
private static ProgramFailureException programFailureException(SQLException e, String externalDocumentationLink) {
    // wrap exception to ensure SQLException-child instances not exposed to contexts without jdbc
    // driver in classpath
    String errorMessage =
      String.format("SQL Exception occurred: [Message='%s', SQLState='%s', ErrorCode='%s'].",
        e.getMessage(), e.getSQLState(), e.getErrorCode());
    String errorMessageWithDetails = String.format("Error occurred while trying to" +
      " get schema from database." + "Error message: '%s'. Error code: '%s'. SQLState: '%s'", e.getMessage(),
        e.getErrorCode(), e.getSQLState());
 
    if (!Strings.isNullOrEmpty(externalDocumentationLink)) {
      if (!errorMessage.endsWith(".")) {
        errorMessage = errorMessage + ".";
      }
      errorMessage = String.format("%s For more details, see %s", errorMessage, externalDocumentationLink);
    }
    return ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN),
      errorMessage, errorMessageWithDetails, ErrorType.USER, false, ErrorCodeType.SQLSTATE, e.getSQLState(),
        externalDocumentationLink, e);
  }
}

You can create a RetryUtils like above which accepts connection params.

@sgarg-CS
Copy link
Contributor Author

sgarg-CS commented Jun 3, 2025

Overall, this looks everything is getting wrapped within Failsafe where we might end up with having nested level retries, we need to ensure we add retries only where we are actually interacting with JDBC client and not top level functions.

For example adding retries to DriverManager.getConnection(connectionString, connectionProperties) makes sense because you are actually interacting with the source db but adding retries to whole loadSchema(Connection connection, String query) do not makes sense we need to be careful while adding such retries.

Refactored the code to add the retry logic only for the methods interacting with the JDBC client.

Copy link
Member

@itsankit-google itsankit-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does it affect the UI if validateSchema fails for DB sinks? Can we test before/after the change?

@sgarg-CS
Copy link
Contributor Author

sgarg-CS commented Jun 5, 2025

How does it affect the UI if validateSchema fails for DB sinks? Can we test before/after the change?

Yes. Will check and update the behaviour here.

I've reverted the changes done to handle SQLException thrown by validateSchema() method call from AbstractDBSink.configurePipeline class. Still, I see a change in the error message on the UI. This is probably due to catching SQLException and then wrapping it to throw ProgramFailureException. While this wrapping was done for inferSchema() but not for validateSchema() in AbstractSink class earlier.

Test Scenario: Validate the schema in PostgreSQL Sink Plugin, if connection is not active. (Postgres DB is down)

[BEFORE CHANGES]

Error message on the UI:
Exception while trying to validate schema of database table '"users2"' for connection 'jdbc:postgresql://localhost:5433/postgres' with Connection to localhost:5433 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.

[AFTER CHANGES]

Error message on the UI:
Error encountered while configuring the stage: 'Error occurred while trying to get schema from database. Error message: 'Connection to localhost:5433 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.'. Error code: '0'. SQLState: '08001''

Is the new error message acceptable or should we revert the changes done to throw ProgramFailureException for validateSchema() method?

@itsankit-google
Copy link
Member

How does it affect the UI if validateSchema fails for DB sinks? Can we test before/after the change?

Yes. Will check and update the behaviour here.

I've reverted the changes done to handle SQLException thrown by validateSchema() method call from AbstractDBSink.configurePipeline class. Still, I see a change in the error message on the UI. This is probably due to catching SQLException and then wrapping it to throw ProgramFailureException. While this wrapping was done for inferSchema() but not for validateSchema() in AbstractSink class earlier.

Test Scenario: Validate the schema in PostgreSQL Sink Plugin, if connection is not active. (Postgres DB is down)

[BEFORE CHANGES]

Error message on the UI: Exception while trying to validate schema of database table '"users2"' for connection 'jdbc:postgresql://localhost:5433/postgres' with Connection to localhost:5433 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.

[AFTER CHANGES]

Error message on the UI: Error encountered while configuring the stage: 'Error occurred while trying to get schema from database. Error message: 'Connection to localhost:5433 refused. Check that the hostname and port are correct and that the postmaster is accepting TCP/IP connections.'. Error code: '0'. SQLState: '08001''

Is the new error message acceptable or should we revert the changes done to throw ProgramFailureException for validateSchema() method?

LGTM

Comment on lines 385 to 386
protected String getExternalDocumentationLink() {
return null;
return "https://en.wikipedia.org/wiki/SQLSTATE";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this method still needed? can we remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed now. Removed. 7c8815d

Comment on lines 165 to 167
protected String getExternalDocumentationLink() {
return "https://en.wikipedia.org/wiki/SQLSTATE";
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this method still needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, not needed now. Removed it. 7c8815d

@@ -87,13 +86,17 @@ public abstract class AbstractDBSource<T extends PluginConfig & DatabaseSourceCo
Pattern.CASE_INSENSITIVE);
private static final Pattern WHERE_CONDITIONS = Pattern.compile("\\s+where \\$conditions",
Pattern.CASE_INSENSITIVE);
private final RetryPolicy<?> retryPolicy;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please remove empty line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed 0b57ce6

@itsmekumari itsmekumari force-pushed the patch/plugin-1823 branch 3 times, most recently from c6de31b to 26efc64 Compare July 7, 2025 10:26
@@ -1,9 +1,9 @@
errorMessageInvalidSourceDatabase=SQL error while getting query schema: Error: Unknown database 'invalidDatabase', SQLState: 42000, ErrorCode: 1049
errorMessageInvalidSourceDatabase=errorMessage: SQL Exception occurred
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still see generic error messages which is bad.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The earlier error message is changed as per the new code, and hence as per the latest code updated the error message not include the dynamic/privacy data present in error msgs here it contains.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As suggested, updated the error message.

Copy link
Member

@itsankit-google itsankit-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we please add Open and Capture Logs step to these scenarios :

} else if (cause instanceof RuntimeException) {
return (RuntimeException) cause;
} else if (cause instanceof Error) {
return new RuntimeException("Failsafe wrapped an Error", cause);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return new RuntimeException("Operation failed with error", cause);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated 7977d45

} else if (cause instanceof Error) {
return new RuntimeException("Failsafe wrapped an Error", cause);
} else {
return new RuntimeException("Failsafe wrapped a non-runtime exception", cause);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return new RuntimeException("Operation failed", cause);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated 7977d45

@itsmekumari
Copy link
Contributor

can we please add Open and Capture Logs step to these scenarios :

Can we address these changes in other PR? We will create a new ticket to address these changes. Please confirm.

@itsankit-google
Copy link
Member

Can we address these changes in other PR? We will create a new ticket to address these changes. Please confirm.

I am not sure how to verify the e2e changes then?

@itsmekumari
Copy link
Contributor

Can we address these changes in other PR? We will create a new ticket to address these changes. Please confirm.

I am not sure how to verify the e2e changes then?

Changes done.

@sgarg-CS sgarg-CS force-pushed the patch/plugin-1823 branch from 7977d45 to 40ff60a Compare July 14, 2025 06:07
@sgarg-CS sgarg-CS force-pushed the patch/plugin-1823 branch from 7e16744 to 3f332d1 Compare July 17, 2025 05:46
@sgarg-CS
Copy link
Contributor Author

hi @itsankit-google, Please let me know if there are any new review comments to be addressed. If not, can we approve it?

@sgarg-CS sgarg-CS force-pushed the patch/plugin-1823 branch 2 times, most recently from 148f5a7 to 766132b Compare August 7, 2025 07:38
@sgarg-CS sgarg-CS force-pushed the patch/plugin-1823 branch from 766132b to 40264b9 Compare August 7, 2025 07:39
connection, String.format("SELECT %s FROM %s WHERE 1 = 0", dbColumns, fullyQualifiedTableName),
getErrorDetailsProvider());
ResultSet rs = RetryUtils.executeQueryWithRetry((RetryPolicy<ResultSet>) retryPolicy, pStmt,
getErrorDetailsProvider())) {
getFieldsValidator().validateFields(inputSchema, rs, collector);
}
} catch (SQLException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this catch is needed now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's needed to handle the SQLException thrown from the auto-closeable resource used in the try with resources block.

Copy link
Member

@itsankit-google itsankit-google Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both the places where ever we are connecting to SQL driver, I can see RetryUtils getting used, when will it go in this cache clause?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might go.

As we're using try-with-resources, this will auto close the resources being used by implicitly calling close() method on each resource in the reverse order of declaration.
Since close() can throw SQLException, the compiler enforces that we either:

  • catch it, or
  • declare it in the method signature

Furthermore, we also have methods such as getMetaData(), getTables() and next() that can possibly throw SQLException

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add sqlState and errorCode in the error message of catch block as well?

inferredFields.addAll(getSchemaReader().getSchemaFields(rs));
}
} catch (SQLException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also add it here then?

Otherwise it will fallback to lower one which will show incorrect error message

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants