Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
Expand Down Expand Up @@ -413,6 +414,28 @@ The name of the database (or the name of the catalog, depending on the destinati
.required(false)
.build();

static final PropertyDescriptor SQL_PRE_QUERY = new PropertyDescriptor.Builder()
.name("SQL Pre-Query")
.description("A semicolon-delimited list of queries executed before the main SQL query is executed. " +
"For example, set session properties before main query. " +
"It's possible to include semicolons in the statements themselves by escaping them with a backslash ('\\;'). " +
"Results/outputs from these queries will be suppressed if there are no errors.")
Comment on lines +419 to +422
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A multiline string can be used instead of string concatenation to build the description.

.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.build();

static final PropertyDescriptor SQL_POST_QUERY = new PropertyDescriptor.Builder()
.name("SQL Post-Query")
.description("A semicolon-delimited list of queries executed after the main SQL query is executed. " +
"Example like setting session properties after main query. " +
"It's possible to include semicolons in the statements themselves by escaping them with a backslash ('\\;'). " +
"Results/outputs from these queries will be suppressed if there are no errors.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(FLOWFILE_ATTRIBUTES)
.build();

static final PropertyDescriptor DB_TYPE = DatabaseAdapterDescriptor.getDatabaseTypeDescriptor();
static final PropertyDescriptor DATABASE_DIALECT_SERVICE = DatabaseAdapterDescriptor.getDatabaseDialectServiceDescriptor(DB_TYPE);

Expand Down Expand Up @@ -443,7 +466,9 @@ The name of the database (or the name of the catalog, depending on the destinati
RollbackOnFailure.ROLLBACK_ON_FAILURE,
TABLE_SCHEMA_CACHE_SIZE,
MAX_BATCH_SIZE,
AUTO_COMMIT
AUTO_COMMIT,
SQL_PRE_QUERY,
SQL_POST_QUERY
);

private Cache<SchemaKey, TableSchema> schemaCache;
Expand Down Expand Up @@ -558,6 +583,9 @@ public void onTrigger(final ProcessContext context, final ProcessSession session

final DBCPService dbcpService = context.getProperty(DBCP_SERVICE).asControllerService(DBCPService.class);

List<String> preQueries = getQueries(context.getProperty(SQL_PRE_QUERY).evaluateAttributeExpressions(flowFile).getValue());
List<String> postQueries = getQueries(context.getProperty(SQL_POST_QUERY).evaluateAttributeExpressions(flowFile).getValue());

Connection connection = null;
boolean originalAutoCommit = false;
try {
Expand All @@ -573,8 +601,19 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
}
}

Pair<String, SQLException> failure = executeConfigStatements(connection, preQueries);
if (failure != null) {
// In case of failure, assigning config query to "selectQuery" to follow current error handling
throw failure.getRight();
}

putToDatabase(context, session, flowFile, connection);

failure = executeConfigStatements(connection, postQueries);
if (failure != null) {
throw failure.getRight();
}

// If the connection's auto-commit setting is false, then manually commit the transaction
if (!connection.getAutoCommit()) {
connection.commit();
Expand Down Expand Up @@ -1629,6 +1668,42 @@ private Set<String> normalizeKeyColumnNamesAndCheckForValues(RecordSchema record
return normalizedKeyColumnNames;
}

/*
* Extract list of queries from config property
*/
private List<String> getQueries(final String value) {
if (value == null || value.isEmpty() || value.isBlank()) {
return null;
}
final List<String> queries = new ArrayList<>();
for (String query : value.split("(?<!\\\\);")) {
query = query.replaceAll("\\\\;", ";");
if (!query.isBlank()) {
queries.add(query.trim());
}
}
return queries;
}

/*
* Executes given queries using pre-defined connection.
* Returns null on success, or a query string if failed.
*/
private Pair<String, SQLException> executeConfigStatements(final Connection con, final List<String> configQueries) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching and returning the exception, just to throw it from the calling method, does not seem like the best approach. I recommend simply declaring throws SQLException on this method and not returning anything.

if (configQueries == null || configQueries.isEmpty()) {
return null;
}

for (String confSQL : configQueries) {
try (final Statement st = con.createStatement()) {
st.execute(confSQL);
} catch (SQLException e) {
return Pair.of(confSQL, e);
}
}
return null;
}

private boolean isSupportsBatchUpdates(Connection connection) {
try {
return connection.getMetaData().supportsBatchUpdates();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2157,6 +2157,234 @@ public void testInsertLongVarBinaryColumn() throws InitializationException, Proc
conn.close();
}

@Test
public void testInsertWithPreQuery() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());

recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);

parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("code", RecordFieldType.INT);
parser.addSchemaField("dt", RecordFieldType.DATE);

LocalDate testDate1 = LocalDate.of(2021, 1, 26);
Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ
LocalDate testDate2 = LocalDate.of(2021, 7, 26);
Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ

parser.addRecord(1, "rec1", 101, jdbcDate1);
parser.addRecord(2, "rec2", 102, jdbcDate2);
parser.addRecord(3, "rec3", 103, null);
parser.addRecord(4, "rec4", 104, null);
parser.addRecord(5, null, 105, null);

runner.setProperty(PutDatabaseRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");

runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");

runner.enqueue(new byte[0]);
runner.run();

runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("rec1", rs.getString(2));
assertEquals(101, rs.getInt(3));
assertEquals(jdbcDate1.toString(), rs.getDate(4).toString());
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertEquals("rec2", rs.getString(2));
assertEquals(102, rs.getInt(3));
assertEquals(jdbcDate2.toString(), rs.getDate(4).toString());
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
assertEquals("rec3", rs.getString(2));
assertEquals(103, rs.getInt(3));
assertNull(rs.getDate(4));
assertTrue(rs.next());
assertEquals(4, rs.getInt(1));
assertEquals("rec4", rs.getString(2));
assertEquals(104, rs.getInt(3));
assertNull(rs.getDate(4));
assertTrue(rs.next());
assertEquals(5, rs.getInt(1));
assertNull(rs.getString(2));
assertEquals(105, rs.getInt(3));
assertNull(rs.getDate(4));
assertFalse(rs.next());
Comment on lines +2198 to +2223
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These assertions appear to be duplicated in the other test method. Recommend creating a shared private assertResultsFound() method or similar to delegate the evaluation.


stmt.close();
conn.close();
}

@Test
public void testInsertWithPreQueryFail() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());

recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);

parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("code", RecordFieldType.INT);
parser.addSchemaField("dt", RecordFieldType.DATE);

LocalDate testDate1 = LocalDate.of(2021, 1, 26);
Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ
LocalDate testDate2 = LocalDate.of(2021, 7, 26);
Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ

parser.addRecord(1, "rec1", 101, jdbcDate1);
parser.addRecord(2, "rec2", 102, jdbcDate2);
parser.addRecord(3, "rec3", 103, null);
parser.addRecord(4, "rec4", 104, null);
parser.addRecord(5, null, 105, null);

runner.setProperty(PutDatabaseRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");

runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");

runner.enqueue(new byte[0]);
runner.run();

runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1);
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertFalse(rs.next());

stmt.close();
conn.close();
}

@Test
public void testInsertWithPostQuery() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());

recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);

parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("code", RecordFieldType.INT);
parser.addSchemaField("dt", RecordFieldType.DATE);

LocalDate testDate1 = LocalDate.of(2021, 1, 26);
Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ
LocalDate testDate2 = LocalDate.of(2021, 7, 26);
Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ

parser.addRecord(1, "rec1", 101, jdbcDate1);
parser.addRecord(2, "rec2", 102, jdbcDate2);
parser.addRecord(3, "rec3", 103, null);
parser.addRecord(4, "rec4", 104, null);
parser.addRecord(5, null, 105, null);

runner.setProperty(PutDatabaseRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
runner.setProperty(PutDatabaseRecord.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(0);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(0)");

runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");

runner.enqueue(new byte[0]);
runner.run();

runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1);
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertEquals("rec1", rs.getString(2));
assertEquals(101, rs.getInt(3));
assertEquals(jdbcDate1.toString(), rs.getDate(4).toString());
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertEquals("rec2", rs.getString(2));
assertEquals(102, rs.getInt(3));
assertEquals(jdbcDate2.toString(), rs.getDate(4).toString());
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
assertEquals("rec3", rs.getString(2));
assertEquals(103, rs.getInt(3));
assertNull(rs.getDate(4));
assertTrue(rs.next());
assertEquals(4, rs.getInt(1));
assertEquals("rec4", rs.getString(2));
assertEquals(104, rs.getInt(3));
assertNull(rs.getDate(4));
assertTrue(rs.next());
assertEquals(5, rs.getInt(1));
assertNull(rs.getString(2));
assertEquals(105, rs.getInt(3));
assertNull(rs.getDate(4));
assertFalse(rs.next());

stmt.close();
conn.close();
}

@Test
public void testInsertWithPostQueryFail() throws InitializationException, ProcessException, SQLException {
setRunner(TestCaseEnum.DEFAULT_0.getTestCase());

recreateTable(createPersons);
final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);

parser.addSchemaField("id", RecordFieldType.INT);
parser.addSchemaField("name", RecordFieldType.STRING);
parser.addSchemaField("code", RecordFieldType.INT);
parser.addSchemaField("dt", RecordFieldType.DATE);

LocalDate testDate1 = LocalDate.of(2021, 1, 26);
Date jdbcDate1 = Date.valueOf(testDate1); // in local TZ
LocalDate testDate2 = LocalDate.of(2021, 7, 26);
Date jdbcDate2 = Date.valueOf(testDate2); // in local TZ

parser.addRecord(1, "rec1", 101, jdbcDate1);
parser.addRecord(2, "rec2", 102, jdbcDate2);
parser.addRecord(3, "rec3", 103, null);
parser.addRecord(4, "rec4", 104, null);
parser.addRecord(5, null, 105, null);

runner.setProperty(PutDatabaseRecord.SQL_PRE_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS(1);CALL SYSCS_UTIL.SYSCS_SET_STATISTICS_TIMING(1)");
runner.setProperty(PutDatabaseRecord.SQL_POST_QUERY, "CALL SYSCS_UTIL.SYSCS_SET_RUNTIMESTATISTICS()");

runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");

runner.enqueue(new byte[0]);
runner.run();

runner.assertTransferCount(PutDatabaseRecord.REL_FAILURE, 1);
final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement();
final ResultSet rs = stmt.executeQuery("SELECT * FROM PERSONS");
assertFalse(rs.next());

stmt.close();
conn.close();
}

private void recreateTable() throws ProcessException {
try (final Connection conn = dbcp.getConnection();
final Statement stmt = conn.createStatement()) {
Expand Down
Loading