Skip to content

Commit 2b9512b

Browse files
committed
Add ON UPDATE support to teleport
1 parent 4e678c1 commit 2b9512b

File tree

13 files changed

+332
-10
lines changed

13 files changed

+332
-10
lines changed

v1/src/main/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static com.google.cloud.teleport.spanner.AvroUtil.IDENTITY_COLUMN;
2222
import static com.google.cloud.teleport.spanner.AvroUtil.INPUT;
2323
import static com.google.cloud.teleport.spanner.AvroUtil.NOT_NULL;
24+
import static com.google.cloud.teleport.spanner.AvroUtil.ON_UPDATE_EXPRESSION;
2425
import static com.google.cloud.teleport.spanner.AvroUtil.OUTPUT;
2526
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_CHANGE_STREAM_FOR_CLAUSE;
2627
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_CHECK_CONSTRAINT;
@@ -632,13 +633,18 @@ public Table toTable(String tableName, Schema schema) {
632633
if (nullable) {
633634
avroType = unpacked;
634635
}
636+
} else {
637+
String notNull = f.getProp(NOT_NULL);
638+
nullable = notNull != null && !Boolean.parseBoolean(notNull);
635639
}
636640
if (Strings.isNullOrEmpty(sqlType)) {
637641
Type spannerType = inferType(avroType, true);
638642
sqlType = SizedType.typeString(spannerType, -1, true);
639643
}
640644
String defaultExpression = f.getProp(DEFAULT_EXPRESSION);
641645
column.parseType(sqlType).notNull(!nullable).defaultExpression(defaultExpression);
646+
String onUpdateExpression = f.getProp(ON_UPDATE_EXPRESSION);
647+
column.parseType(sqlType).notNull(!nullable).onUpdateExpression(onUpdateExpression);
642648
}
643649
String hidden = f.getProp(HIDDEN);
644650
if (Boolean.parseBoolean(hidden)) {

v1/src/main/java/com/google/cloud/teleport/spanner/AvroUtil.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ private AvroUtil() {}
2424

2525
// The property names in Avro schema.
2626
public static final String DEFAULT_EXPRESSION = "defaultExpression";
27+
public static final String ON_UPDATE_EXPRESSION = "onUpdateExpression";
2728
public static final String GENERATION_EXPRESSION = "generationExpression";
2829
public static final String GOOGLE_FORMAT_VERSION = "googleFormatVersion";
2930
public static final String GOOGLE_STORAGE = "googleStorage";

v1/src/main/java/com/google/cloud/teleport/spanner/DdlToAvroSchemaConverter.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static com.google.cloud.teleport.spanner.AvroUtil.IDENTITY_COLUMN;
2424
import static com.google.cloud.teleport.spanner.AvroUtil.INPUT;
2525
import static com.google.cloud.teleport.spanner.AvroUtil.NOT_NULL;
26+
import static com.google.cloud.teleport.spanner.AvroUtil.ON_UPDATE_EXPRESSION;
2627
import static com.google.cloud.teleport.spanner.AvroUtil.OUTPUT;
2728
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_CHANGE_STREAM_FOR_CLAUSE;
2829
import static com.google.cloud.teleport.spanner.AvroUtil.SPANNER_CHECK_CONSTRAINT;
@@ -232,6 +233,9 @@ public Collection<Schema> convert(Ddl ddl) {
232233
}
233234
} else if (cm.defaultExpression() != null) {
234235
fieldBuilder.prop(DEFAULT_EXPRESSION, cm.defaultExpression());
236+
if (cm.onUpdateExpression() != null) {
237+
fieldBuilder.prop(ON_UPDATE_EXPRESSION, cm.onUpdateExpression());
238+
}
235239
}
236240
Schema avroType = avroType(cm.type(), table.name() + "_" + columnOrdinal++);
237241
if (!cm.notNull()) {

v1/src/main/java/com/google/cloud/teleport/spanner/ddl/Column.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ public abstract class Column implements Serializable {
7878
@Nullable
7979
public abstract String defaultExpression();
8080

81+
@Nullable
82+
public abstract String onUpdateExpression();
83+
8184
public static Builder builder(Dialect dialect) {
8285
return new AutoValue_Column.Builder()
8386
.dialect(dialect)
@@ -113,6 +116,14 @@ public void prettyPrint(Appendable appendable) throws IOException {
113116
appendable.append(" (").append(defaultExpression()).append(")");
114117
}
115118
}
119+
if (onUpdateExpression() != null) {
120+
appendable.append(" ON UPDATE ");
121+
if (dialect() == Dialect.POSTGRESQL) {
122+
appendable.append(onUpdateExpression());
123+
} else {
124+
appendable.append(" (").append(onUpdateExpression()).append(")");
125+
}
126+
}
116127
if (isIdentityColumn()) {
117128
appendable.append(" GENERATED BY DEFAULT AS IDENTITY");
118129
List<String> options = new ArrayList<>(3);
@@ -227,6 +238,8 @@ public Builder notNull() {
227238

228239
public abstract Builder defaultExpression(String expression);
229240

241+
public abstract Builder onUpdateExpression(String expression);
242+
230243
public Builder generatedAs(String expression) {
231244
return isGenerated(true).generationExpression(expression);
232245
}

v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -391,10 +391,11 @@ private void listColumns(Ddl.Builder builder) {
391391
resultSet.isNull(13) ? null : Long.valueOf(resultSet.getString(13));
392392
Long identitySkipRangeMax =
393393
resultSet.isNull(14) ? null : Long.valueOf(resultSet.getString(14));
394+
String onUpdateExpression = resultSet.isNull(15) ? null : resultSet.getString(15);
394395
boolean isHidden =
395396
dialect == Dialect.GOOGLE_STANDARD_SQL
396-
? resultSet.getBoolean(15)
397-
: resultSet.getString(15).equalsIgnoreCase("YES");
397+
? resultSet.getBoolean(16)
398+
: resultSet.getString(16).equalsIgnoreCase("YES");
398399
boolean isPlacementKey = resultSet.getBoolean(16);
399400

400401
builder
@@ -407,6 +408,7 @@ private void listColumns(Ddl.Builder builder) {
407408
.generationExpression(generationExpression)
408409
.isStored(isStored)
409410
.defaultExpression(defaultExpression)
411+
.onUpdateExpression(onUpdateExpression)
410412
.isIdentityColumn(isIdentity)
411413
.sequenceKind(sequenceKind)
412414
.counterStartValue(identityStartWithCounter)
@@ -434,7 +436,8 @@ Statement listColumnsSQL() {
434436
+ " c.ordinal_position, c.spanner_type, c.is_nullable,"
435437
+ " c.is_generated, c.generation_expression, c.is_stored,"
436438
+ " c.column_default, c.is_identity, c.identity_kind, c.identity_start_with_counter,"
437-
+ " c.identity_skip_range_min, c.identity_skip_range_max, c.is_hidden,"
439+
+ " c.identity_skip_range_min, c.identity_skip_range_max,"
440+
+ " c.on_update_expression, c.is_hidden,"
438441
+ " pkc.constraint_name IS NOT NULL AS is_placement_key"
439442
+ " FROM information_schema.columns as c"
440443
+ " LEFT JOIN placementkeycolumns AS pkc"
@@ -450,7 +453,8 @@ Statement listColumnsSQL() {
450453
+ " c.ordinal_position, c.spanner_type, c.is_nullable,"
451454
+ " c.is_generated, c.generation_expression, c.is_stored, c.column_default,"
452455
+ " c.is_identity, c.identity_kind, c.identity_start_with_counter, "
453-
+ " c.identity_skip_range_min, c.identity_skip_range_max, c.is_hidden,"
456+
+ " c.identity_skip_range_min, c.identity_skip_range_max,"
457+
+ " c.on_update_expression, c.is_hidden,"
454458
+ " pkc.constraint_name IS NOT NULL AS is_placement_key"
455459
+ " FROM information_schema.columns as c"
456460
+ " LEFT JOIN placementkeycolumns AS pkc"

v1/src/test/java/com/google/cloud/teleport/spanner/AvroSchemaToDdlConverterTest.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,21 @@ public void simple() {
100100
+ " \"skipRangeMax\" : \"3000\","
101101
+ " \"counterStartValue\" : \"1000\""
102102
+ " }, {"
103+
+ " \"name\" : \"default_commit_ts\","
104+
+ " \"type\" : \"null\","
105+
+ " \"sqlType\" : \"TIMESTAMP\","
106+
+ " \"notNull\" : \"false\","
107+
+ " \"defaultExpression\" : \"PENDING_COMMIT_TIMESTAMP()\","
108+
+ " \"spannerOption_0\" : \"allow_commit_timestamp=true\""
109+
+ " }, {"
110+
+ " \"name\" : \"on_update_ts\","
111+
+ " \"type\" : \"null\","
112+
+ " \"sqlType\" : \"TIMESTAMP\","
113+
+ " \"notNull\" : \"false\","
114+
+ " \"defaultExpression\" : \"PENDING_COMMIT_TIMESTAMP()\","
115+
+ " \"onUpdateExpression\" : \"PENDING_COMMIT_TIMESTAMP()\","
116+
+ " \"spannerOption_0\" : \"allow_commit_timestamp=true\""
117+
+ " }, {"
103118
+ " \"name\" : \"numeric\","
104119
+ " \"type\" : [\"null\", {\"type\":\"bytes\",\"logicalType\":\"decimal\"}],"
105120
+ " \"sqlType\" : \"NUMERIC\""
@@ -243,7 +258,6 @@ public void simple() {
243258
+ "}";
244259

245260
Schema schema = new Schema.Parser().parse(avroString);
246-
247261
AvroSchemaToDdlConverter converter = new AvroSchemaToDdlConverter();
248262
Ddl ddl = converter.toDdl(Collections.singleton(schema));
249263
assertThat(ddl.allTables(), hasSize(1));
@@ -261,6 +275,11 @@ public void simple() {
261275
+ "BIT_REVERSED_POSITIVE SKIP RANGE 2000, 3000 START COUNTER WITH 1000),"
262276
+ " `identity_column_no_kind` INT64 GENERATED BY DEFAULT AS IDENTITY ("
263277
+ "SKIP RANGE 2000, 3000 START COUNTER WITH 1000),"
278+
+ " `default_commit_ts` TIMESTAMP DEFAULT (PENDING_COMMIT_TIMESTAMP())"
279+
+ " OPTIONS (allow_commit_timestamp=true),"
280+
+ " `on_update_ts` TIMESTAMP DEFAULT (PENDING_COMMIT_TIMESTAMP())"
281+
+ " ON UPDATE (PENDING_COMMIT_TIMESTAMP())"
282+
+ " OPTIONS (allow_commit_timestamp=true),"
264283
+ " `numeric` NUMERIC,"
265284
+ " `numeric2` NUMERIC,"
266285
+ " `notNumeric` BYTES(MAX),"
@@ -359,6 +378,19 @@ public void pgSimple() {
359378
+ " \"skipRangeMax\" : \"3000\","
360379
+ " \"counterStartValue\" : \"1000\""
361380
+ " }, {"
381+
+ " \"name\" : \"default_commit_ts\","
382+
+ " \"type\" : [ \"null\", \"string\" ],"
383+
+ " \"sqlType\" : \"spanner.commit_timestamp\","
384+
+ " \"notNull\" : \"false\","
385+
+ " \"defaultExpression\" : \"SPANNER.PENDING_COMMIT_TIMESTAMP()\""
386+
+ " }, {"
387+
+ " \"name\" : \"on_update_ts\","
388+
+ " \"type\" : [ \"null\", \"string\" ],"
389+
+ " \"sqlType\" : \"spanner.commit_timestamp\","
390+
+ " \"notNull\" : \"false\","
391+
+ " \"defaultExpression\" : \"SPANNER.PENDING_COMMIT_TIMESTAMP()\","
392+
+ " \"onUpdateExpression\" : \"SPANNER.PENDING_COMMIT_TIMESTAMP()\""
393+
+ " }, {"
362394
+ " \"name\" : \"numeric\","
363395
+ " \"type\" : [\"null\", {\"type\":\"bytes\",\"logicalType\":\"decimal\"}],"
364396
+ " \"sqlType\" : \"numeric\""
@@ -499,6 +531,11 @@ public void pgSimple() {
499531
+ "BIT_REVERSED_POSITIVE SKIP RANGE 2000 3000 START COUNTER WITH 1000),"
500532
+ " \"identity_column_no_kind\" bigint GENERATED BY DEFAULT AS IDENTITY ("
501533
+ "SKIP RANGE 2000 3000 START COUNTER WITH 1000),"
534+
+ " \"default_commit_ts\" spanner.commit_timestamp "
535+
+ " DEFAULT SPANNER.PENDING_COMMIT_TIMESTAMP(),"
536+
+ " \"on_update_ts\" spanner.commit_timestamp "
537+
+ " DEFAULT SPANNER.PENDING_COMMIT_TIMESTAMP()"
538+
+ " ON UPDATE SPANNER.PENDING_COMMIT_TIMESTAMP(),"
502539
+ " \"numeric\" numeric,"
503540
+ " \"numeric2\" numeric,"
504541
+ " \"notNumeric\" bytea,"

v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbTest.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,6 +1001,76 @@ public void pgIdentityColumn() throws Exception {
10011001
runTest(Dialect.POSTGRESQL);
10021002
}
10031003

1004+
@Test
1005+
public void commitTimestampColumns() throws Exception {
1006+
// spotless:off
1007+
Ddl.Builder ddlBuilder = Ddl.builder();
1008+
List<Export.DatabaseOption> dbOptionList = new ArrayList<>();
1009+
dbOptionList.add(
1010+
Export.DatabaseOption.newBuilder()
1011+
.setOptionName("default_sequence_kind")
1012+
.setOptionValue("\"bit_reversed_positive\"")
1013+
.build());
1014+
ddlBuilder.mergeDatabaseOptions(dbOptionList);
1015+
Ddl ddl = ddlBuilder
1016+
.createTable("CommitTimestampTable")
1017+
.column("id")
1018+
.int64()
1019+
.endColumn()
1020+
.column("default_commit_ts")
1021+
.type(Type.timestamp())
1022+
.defaultExpression("PENDING_COMMIT_TIMESTAMP()")
1023+
.columnOptions(ImmutableList.of("allow_commit_timestamp=TRUE"))
1024+
.endColumn()
1025+
.column("on_update_ts")
1026+
.type(Type.timestamp())
1027+
.defaultExpression("PENDING_COMMIT_TIMESTAMP()")
1028+
.onUpdateExpression("PENDING_COMMIT_TIMESTAMP()")
1029+
.columnOptions(ImmutableList.of("allow_commit_timestamp=TRUE"))
1030+
.endColumn()
1031+
.primaryKey().asc("id").end()
1032+
.endTable()
1033+
.build();
1034+
// spotless:on
1035+
1036+
createAndPopulate(ddl, 10);
1037+
runTest();
1038+
}
1039+
1040+
@Test
1041+
public void pgCommitTimestampColumns() throws Exception {
1042+
// spotless:off
1043+
Ddl.Builder ddlBuilder = Ddl.builder(Dialect.POSTGRESQL);
1044+
List<Export.DatabaseOption> dbOptionList = new ArrayList<>();
1045+
dbOptionList.add(
1046+
Export.DatabaseOption.newBuilder()
1047+
.setOptionName("default_sequence_kind")
1048+
.setOptionValue("\"bit_reversed_positive\"")
1049+
.build());
1050+
ddlBuilder.mergeDatabaseOptions(dbOptionList);
1051+
Ddl ddl = ddlBuilder
1052+
.createTable("CommitTimestampTable")
1053+
.column("id")
1054+
.int64()
1055+
.endColumn()
1056+
.column("default_commit_ts")
1057+
.pgSpannerCommitTimestamp()
1058+
.defaultExpression("spanner.pending_commit_timestamp()")
1059+
.endColumn()
1060+
.column("on_update_ts")
1061+
.pgSpannerCommitTimestamp()
1062+
.defaultExpression("spanner.pending_commit_timestamp()")
1063+
.onUpdateExpression("spanner.pending_commit_timestamp()")
1064+
.endColumn()
1065+
.primaryKey().asc("id").end()
1066+
.endTable()
1067+
.build();
1068+
// spotless:on
1069+
1070+
createAndPopulate(ddl, 10);
1071+
runTest();
1072+
}
1073+
10041074
@Test
10051075
public void udfs() throws Exception {
10061076
Ddl.Builder ddlBuilder = Ddl.builder();

0 commit comments

Comments
 (0)