diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index 39bdda9c241e..7a0c74c86a85 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -19,6 +19,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; @@ -413,6 +414,28 @@ The name of the database (or the name of the catalog, depending on the destinati .required(false) .build(); + static final PropertyDescriptor SQL_PRE_QUERY = new PropertyDescriptor.Builder() + .name("SQL Pre-Query") + .description("A semicolon-delimited list of queries executed before the main SQL query is executed. " + + "For example, set session properties before main query. " + + "It's possible to include semicolons in the statements themselves by escaping them with a backslash ('\\;'). " + + "Results/outputs from these queries will be suppressed if there are no errors.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .build(); + + static final PropertyDescriptor SQL_POST_QUERY = new PropertyDescriptor.Builder() + .name("SQL Post-Query") + .description("A semicolon-delimited list of queries executed after the main SQL query is executed. " + + "Example like setting session properties after main query. " + + "It's possible to include semicolons in the statements themselves by escaping them with a backslash ('\\;'). " + + "Results/outputs from these queries will be suppressed if there are no errors.") + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(FLOWFILE_ATTRIBUTES) + .build(); + static final PropertyDescriptor DB_TYPE = DatabaseAdapterDescriptor.getDatabaseTypeDescriptor(); static final PropertyDescriptor DATABASE_DIALECT_SERVICE = DatabaseAdapterDescriptor.getDatabaseDialectServiceDescriptor(DB_TYPE); @@ -443,7 +466,9 @@ The name of the database (or the name of the catalog, depending on the destinati RollbackOnFailure.ROLLBACK_ON_FAILURE, TABLE_SCHEMA_CACHE_SIZE, MAX_BATCH_SIZE, - AUTO_COMMIT + AUTO_COMMIT, + SQL_PRE_QUERY, + SQL_POST_QUERY ); private Cache schemaCache; @@ -558,6 +583,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class); + List preQueries = getQueries(context.getProperty(SQL_PRE_QUERY).evaluateAttributeExpressions(flowFile).getValue()); + List postQueries = getQueries(context.getProperty(SQL_POST_QUERY).evaluateAttributeExpressions(flowFile).getValue()); + Connection connection = null; boolean originalAutoCommit = false; try { @@ -573,8 +601,19 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } } + Pair failure = executeConfigStatements(connection, preQueries); + if (failure != null) { + // In case of failure, assigning config query to "selectQuery" to follow current error handling + throw failure.getRight(); + } + putToDatabase(context, session, flowFile, connection); + failure = executeConfigStatements(connection, postQueries); + if (failure != null) { + throw failure.getRight(); + } + // If the connection's auto-commit setting is false, then manually commit the transaction if (!connection.getAutoCommit()) { connection.commit(); @@ -1629,6 +1668,42 @@ private Set normalizeKeyColumnNamesAndCheckForValues(RecordSchema record return normalizedKeyColumnNames; } + /* + * Extract list of queries from config property + */ + private List getQueries(final String value) { + if (value == null || value.isEmpty() || value.isBlank()) { + return null; + } + final List queries = new ArrayList<>(); + for (String query : value.split("(? executeConfigStatements(final Connection con, final List configQueries) { + if (configQueries == null || configQueries.isEmpty()) { + return null; + } + + for (String confSQL : configQueries) { + try (final Statement st = con.createStatement()) { + st.execute(confSQL); + } catch (SQLException e) { + return Pair.of(confSQL, e); + } + } + return null; + } + private boolean isSupportsBatchUpdates(Connection connection) { try { return connection.getMetaData().supportsBatchUpdates(); diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java index 090345c0164b..e85ac1dc4ec3 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java @@ -2157,6 +2157,234 @@ public void testInsertLongVarBinaryColumn() throws InitializationException, Proc conn.close(); } + @Test + public void testInsertWithPreQuery() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + + recreateTable(createPersons); + final MockRecordParser parser = new MockRecordParser(); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + + parser.addSchemaField("id", RecordFieldType.INT); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("code", RecordFieldType.INT); + parser.addSchemaField("dt", RecordFieldType.DATE); + + LocalDate testDate1 = LocalDate.of(2021, 1, 26); + Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ + LocalDate testDate2 = LocalDate.of(2021, 7, 26); + Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ + + parser.addRecord(1, "rec1", 101, jdbcDate1); + parser.addRecord(2, "rec2", 102, jdbcDate2); + parser.addRecord(3, "rec3", 103, null); + parser.addRecord(4, "rec4", 104, null); + parser.addRecord(5, null, 105, null); + + runner.setProperty(PutDatabaseRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); + runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1); + final Connection conn = dbcp.getConnection(); + final Statement stmt = conn.createStatement(); + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("rec1", rs.getString(2)); + assertEquals(101, rs.getInt(3)); + assertEquals(jdbcDate1.toString(), rs.getDate(4).toString()); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("rec2", rs.getString(2)); + assertEquals(102, rs.getInt(3)); + assertEquals(jdbcDate2.toString(), rs.getDate(4).toString()); + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + assertEquals("rec3", rs.getString(2)); + assertEquals(103, rs.getInt(3)); + assertNull(rs.getDate(4)); + assertTrue(rs.next()); + assertEquals(4, rs.getInt(1)); + assertEquals("rec4", rs.getString(2)); + assertEquals(104, rs.getInt(3)); + assertNull(rs.getDate(4)); + assertTrue(rs.next()); + assertEquals(5, rs.getInt(1)); + assertNull(rs.getString(2)); + assertEquals(105, rs.getInt(3)); + assertNull(rs.getDate(4)); + assertFalse(rs.next()); + + stmt.close(); + conn.close(); + } + + @Test + public void testInsertWithPreQueryFail() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + + recreateTable(createPersons); + final MockRecordParser parser = new MockRecordParser(); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + + parser.addSchemaField("id", RecordFieldType.INT); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("code", RecordFieldType.INT); + parser.addSchemaField("dt", RecordFieldType.DATE); + + LocalDate testDate1 = LocalDate.of(2021, 1, 26); + Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ + LocalDate testDate2 = LocalDate.of(2021, 7, 26); + Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ + + parser.addRecord(1, "rec1", 101, jdbcDate1); + parser.addRecord(2, "rec2", 102, jdbcDate2); + parser.addRecord(3, "rec3", 103, null); + parser.addRecord(4, "rec4", 104, null); + parser.addRecord(5, null, 105, null); + + runner.setProperty(PutDatabaseRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()"); + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); + runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1); + final Connection conn = dbcp.getConnection(); + final Statement stmt = conn.createStatement(); + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertFalse(rs.next()); + + stmt.close(); + conn.close(); + } + + @Test + public void testInsertWithPostQuery() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + + recreateTable(createPersons); + final MockRecordParser parser = new MockRecordParser(); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + + parser.addSchemaField("id", RecordFieldType.INT); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("code", RecordFieldType.INT); + parser.addSchemaField("dt", RecordFieldType.DATE); + + LocalDate testDate1 = LocalDate.of(2021, 1, 26); + Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ + LocalDate testDate2 = LocalDate.of(2021, 7, 26); + Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ + + parser.addRecord(1, "rec1", 101, jdbcDate1); + parser.addRecord(2, "rec2", 102, jdbcDate2); + parser.addRecord(3, "rec3", 103, null); + parser.addRecord(4, "rec4", 104, null); + parser.addRecord(5, null, 105, null); + + runner.setProperty(PutDatabaseRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); + runner.setProperty(PutDatabaseRecord.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)"); + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); + runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1); + final Connection conn = dbcp.getConnection(); + final Statement stmt = conn.createStatement(); + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("rec1", rs.getString(2)); + assertEquals(101, rs.getInt(3)); + assertEquals(jdbcDate1.toString(), rs.getDate(4).toString()); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("rec2", rs.getString(2)); + assertEquals(102, rs.getInt(3)); + assertEquals(jdbcDate2.toString(), rs.getDate(4).toString()); + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + assertEquals("rec3", rs.getString(2)); + assertEquals(103, rs.getInt(3)); + assertNull(rs.getDate(4)); + assertTrue(rs.next()); + assertEquals(4, rs.getInt(1)); + assertEquals("rec4", rs.getString(2)); + assertEquals(104, rs.getInt(3)); + assertNull(rs.getDate(4)); + assertTrue(rs.next()); + assertEquals(5, rs.getInt(1)); + assertNull(rs.getString(2)); + assertEquals(105, rs.getInt(3)); + assertNull(rs.getDate(4)); + assertFalse(rs.next()); + + stmt.close(); + conn.close(); + } + + @Test + public void testInsertWithPostQueryFail() throws InitializationException, ProcessException, SQLException { + setRunner(TestCaseEnum.DEFAULT_0.getTestCase()); + + recreateTable(createPersons); + final MockRecordParser parser = new MockRecordParser(); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + + parser.addSchemaField("id", RecordFieldType.INT); + parser.addSchemaField("name", RecordFieldType.STRING); + parser.addSchemaField("code", RecordFieldType.INT); + parser.addSchemaField("dt", RecordFieldType.DATE); + + LocalDate testDate1 = LocalDate.of(2021, 1, 26); + Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ + LocalDate testDate2 = LocalDate.of(2021, 7, 26); + Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ + + parser.addRecord(1, "rec1", 101, jdbcDate1); + parser.addRecord(2, "rec2", 102, jdbcDate2); + parser.addRecord(3, "rec3", 103, null); + parser.addRecord(4, "rec4", 104, null); + parser.addRecord(5, null, 105, null); + + runner.setProperty(PutDatabaseRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)"); + runner.setProperty(PutDatabaseRecord.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()"); + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); + runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1); + final Connection conn = dbcp.getConnection(); + final Statement stmt = conn.createStatement(); + final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS"); + assertFalse(rs.next()); + + stmt.close(); + conn.close(); + } + private void recreateTable() throws ProcessException { try (final Connection conn = dbcp.getConnection(); final Statement stmt = conn.createStatement()) {