Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class FileSystems {
private static final Pattern FILE_SCHEME_PATTERN =
Pattern.compile("(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*):/.*");
private static final Pattern GLOB_PATTERN = Pattern.compile("[*?{}]");
private static final Pattern ESCAPED_GLOB_PATTERN = Pattern.compile("\\\\[*?{}]");
private static final String GLOB_ESCAPE_PREFIX = "\\";

private static final AtomicReference<KV<Long, Integer>> FILESYSTEM_REVISION =
new AtomicReference<>();
Expand All @@ -92,6 +94,40 @@ public static boolean hasGlobWildcard(String spec) {
return GLOB_PATTERN.matcher(spec).find();
}

/**
* Escapes glob wildcard characters in the given spec so they are treated as literals.
*
* <p>This method escapes the characters '*', '?', '{', and '}' by prefixing them with a
* backslash, allowing them to be treated as literal characters in a file path rather than as
* glob wildcards.
*
* <p>Example: {@code escapeGlobWildcards("file*.txt")} returns {@code "file\\*.txt"}
*
* @param spec the file path specification to escape
* @return the escaped specification
*/
public static String escapeGlobWildcards(String spec) {
checkNotNull(spec, "spec cannot be null");
return spec.replaceAll("([*?{}])", "\\\\$1");
}

/**
* Unescapes glob wildcard characters in the given spec that were previously escaped with {@link
* #escapeGlobWildcards(String)}.
*
* <p>This method removes the backslash prefix from escaped glob characters ('*', '?', '{', '}'),
* restoring them to their unescaped form.
*
* <p>Example: {@code unescapeGlobWildcards("file\\*.txt")} returns {@code "file*.txt"}
*
* @param spec the file path specification to unescape
* @return the unescaped specification
*/
public static String unescapeGlobWildcards(String spec) {
checkNotNull(spec, "spec cannot be null");
return spec.replaceAll("\\\\([*?{}])", "$1");
}

/**
* This is the entry point to convert user-provided specs to {@link ResourceId ResourceIds}.
* Callers should use {@link #match} to resolve users specs ambiguities before calling other
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,56 @@ private void createFileWithContent(Path path, String content) throws Exception {
}
}

@Test
public void testEscapeGlobWildcards() {
// Test escaping asterisk
assertEquals("file\\*.txt", FileSystems.escapeGlobWildcards("file*.txt"));

// Test escaping question mark
assertEquals("file\\?.txt", FileSystems.escapeGlobWildcards("file?.txt"));

// Test escaping braces
assertEquals("file\\{1,2\\}.txt", FileSystems.escapeGlobWildcards("file{1,2}.txt"));

// Test escaping multiple characters
assertEquals("\\*\\?\\{\\}.txt", FileSystems.escapeGlobWildcards("*?{}.txt"));

// Test string with no glob characters
assertEquals("file.txt", FileSystems.escapeGlobWildcards("file.txt"));

// Test empty string
assertEquals("", FileSystems.escapeGlobWildcards(""));
}

@Test
public void testUnescapeGlobWildcards() {
// Test unescaping asterisk
assertEquals("file*.txt", FileSystems.unescapeGlobWildcards("file\\*.txt"));

// Test unescaping question mark
assertEquals("file?.txt", FileSystems.unescapeGlobWildcards("file\\?.txt"));

// Test unescaping braces
assertEquals("file{1,2}.txt", FileSystems.unescapeGlobWildcards("file\\{1,2\\}.txt"));

// Test unescaping multiple characters
assertEquals("*?{}.txt", FileSystems.unescapeGlobWildcards("\\*\\?\\{\\}.txt"));

// Test string with no escaped characters
assertEquals("file.txt", FileSystems.unescapeGlobWildcards("file.txt"));

// Test empty string
assertEquals("", FileSystems.unescapeGlobWildcards(""));
}

@Test
public void testEscapeUnescapeRoundTrip() {
String original = "file*test?.txt";
String escaped = FileSystems.escapeGlobWildcards(original);
String unescaped = FileSystems.unescapeGlobWildcards(escaped);
assertEquals(original, unescaped);
}

private LocalResourceId toLocalResourceId(String str) throws Exception {
boolean isDirectory;
if (SystemUtils.IS_OS_WINDOWS) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3330,8 +3330,9 @@ public Write<T> withLoadJobProjectId(ValueProvider<String> loadJobProjectId) {
/**
* Choose the frequency at which file writes are triggered.
*
* <p>This is only applicable when the write method is set to {@link Method#FILE_LOADS} or
* {@link Method#STORAGE_WRITE_API}, and only when writing an unbounded {@link PCollection}.
* <p>This is only applicable when the write method is set to {@link Method#FILE_LOADS}, {@link
* Method#STORAGE_WRITE_API}, or {@link Method#STORAGE_API_AT_LEAST_ONCE}, and only when writing
* an unbounded {@link PCollection}.
*
* <p>Every triggeringFrequency duration, a BigQuery load job will be generated for all the data
* written since the last load job. BigQuery has limits on how many load jobs can be triggered
Expand Down Expand Up @@ -3736,19 +3737,22 @@ public WriteResult expand(PCollection<T> input) {
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
Write.Method method = resolveMethod(input);
if (input.isBounded() == IsBounded.UNBOUNDED) {
if (method == Write.Method.FILE_LOADS || method == Write.Method.STORAGE_WRITE_API) {
if (method == Write.Method.FILE_LOADS
|| method == Write.Method.STORAGE_WRITE_API
|| method == Write.Method.STORAGE_API_AT_LEAST_ONCE) {
Duration triggeringFrequency =
(method == Write.Method.STORAGE_WRITE_API)
(method == Write.Method.STORAGE_WRITE_API
|| method == Write.Method.STORAGE_API_AT_LEAST_ONCE)
? getStorageApiTriggeringFrequency(bqOptions)
: getTriggeringFrequency();
checkArgument(
triggeringFrequency != null,
"When writing an unbounded PCollection via FILE_LOADS or STORAGE_WRITE_API, "
"When writing an unbounded PCollection via FILE_LOADS, STORAGE_WRITE_API, or STORAGE_API_AT_LEAST_ONCE, "
+ "triggering frequency must be specified");
} else {
checkArgument(
getTriggeringFrequency() == null,
"Triggering frequency can be specified only when writing via FILE_LOADS or STORAGE_WRITE_API, but the method was %s.",
"Triggering frequency can be specified only when writing via FILE_LOADS, STORAGE_WRITE_API, or STORAGE_API_AT_LEAST_ONCE, but the method was %s.",
method);
}
if (method != Method.FILE_LOADS) {
Expand All @@ -3757,13 +3761,7 @@ public WriteResult expand(PCollection<T> input) {
"Number of file shards can be specified only when writing via FILE_LOADS, but the method was %s.",
method);
}
if (method == Method.STORAGE_API_AT_LEAST_ONCE
&& getStorageApiTriggeringFrequency(bqOptions) != null) {
LOG.warn(
"Storage API triggering frequency option will be ignored is it can only be specified only "
+ "when writing via STORAGE_WRITE_API, but the method was {}.",
method);
}

if (getAutoSharding()) {
if (method == Method.STORAGE_WRITE_API && getStorageApiNumStreams(bqOptions) > 0) {
LOG.warn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public PCollectionTuple expand(PCollection<KV<DestinationT, StorageApiWritePaylo
operationName,
dynamicDestinations,
bqServices,
false,
usesCdc,
options.getStorageApiAppendThresholdBytes(),
options.getStorageApiAppendThresholdRecordCount(),
options.getNumStorageWriteApiStreamAppendClients(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2632,7 +2632,8 @@ public void testStreamingWriteValidateFailsWithoutTriggeringFrequency() {
Method method = useStorageApi ? Method.STORAGE_WRITE_API : Method.FILE_LOADS;

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("unbounded PCollection via FILE_LOADS or STORAGE_WRITE_API");
thrown.expectMessage(
"unbounded PCollection via FILE_LOADS, STORAGE_WRITE_API, or STORAGE_API_AT_LEAST_ONCE");
thrown.expectMessage("triggering frequency must be specified");

p.getOptions().as(BigQueryOptions.class).setStorageWriteApiTriggeringFrequencySec(null);
Expand All @@ -2646,6 +2647,72 @@ public void testStreamingWriteValidateFailsWithoutTriggeringFrequency() {
.withCreateDisposition(CreateDisposition.CREATE_NEVER));
}

@Test
public void testStreamingWriteValidateFailsWithoutTriggeringFrequencyForStorageApiAtLeastOnce() {
assumeTrue(useStreaming);
assumeTrue(useStorageApiApproximate); // Test STORAGE_API_AT_LEAST_ONCE specifically
p.enableAbandonedNodeEnforcement(false);

thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"unbounded PCollection via FILE_LOADS, STORAGE_WRITE_API, or STORAGE_API_AT_LEAST_ONCE");
thrown.expectMessage("triggering frequency must be specified");

p.getOptions().as(BigQueryOptions.class).setStorageWriteApiTriggeringFrequencySec(null);
p.apply(Create.empty(INPUT_RECORD_CODER))
.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED)
.apply(
BigQueryIO.<InputRecord>write()
.withAvroFormatFunction(r -> new GenericData.Record(r.getSchema()))
.to("dataset.table")
.withMethod(Method.STORAGE_API_AT_LEAST_ONCE)
.withCreateDisposition(CreateDisposition.CREATE_NEVER));
}

@Test
public void testStreamingWriteValidateSucceedsWithTriggeringFrequencyForStorageApiAtLeastOnce() {
assumeTrue(useStreaming);
assumeTrue(useStorageApiApproximate); // Test STORAGE_API_AT_LEAST_ONCE specifically
p.enableAbandonedNodeEnforcement(false);

// This should not throw - STORAGE_API_AT_LEAST_ONCE with triggering frequency should be valid
p.getOptions().as(BigQueryOptions.class).setStorageWriteApiTriggeringFrequencySec(30);
p.apply(Create.empty(INPUT_RECORD_CODER))
.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED)
.apply(
BigQueryIO.<InputRecord>write()
.withAvroFormatFunction(r -> new GenericData.Record(r.getSchema()))
.to("dataset.table")
.withMethod(Method.STORAGE_API_AT_LEAST_ONCE)
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withTestServices(fakeBqServices)
.withoutValidation());
// Should validate without throwing
p.run();
}

@Test
public void testBoundedWriteValidateSucceedsWithoutTriggeringFrequencyForStorageApiAtLeastOnce() {
assumeTrue(!useStreaming); // Test bounded PCollection
assumeTrue(useStorageApiApproximate); // Test STORAGE_API_AT_LEAST_ONCE specifically

// Bounded collections should not require triggering frequency even for
// STORAGE_API_AT_LEAST_ONCE
p.getOptions().as(BigQueryOptions.class).setStorageWriteApiTriggeringFrequencySec(null);
p.apply(Create.empty(INPUT_RECORD_CODER))
.setIsBoundedInternal(PCollection.IsBounded.BOUNDED)
.apply(
BigQueryIO.<InputRecord>write()
.withAvroFormatFunction(r -> new GenericData.Record(r.getSchema()))
.to("dataset.table")
.withMethod(Method.STORAGE_API_AT_LEAST_ONCE)
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withTestServices(fakeBqServices)
.withoutValidation());
// Should validate without throwing
p.run();
}

@Test
public void testBigQueryIOGetName() {
assertEquals(
Expand Down Expand Up @@ -4924,4 +4991,49 @@ public void testCustomGcsTempLocationNull() throws Exception {
fakeDatasetService.getAllRows("project-id", "dataset-id", "table-id"),
containsInAnyOrder(new TableRow().set("name", "a"), new TableRow().set("name", "b")));
}

@Test
public void testCdcWithStorageWriteApiDoesNotThrowIllegalStateException() throws Exception {
// Test for issue #31422: CDC with STORAGE_WRITE_API should not throw IllegalStateException
assumeTrue(useStorageApi);
assumeTrue(!useStorageApiApproximate); // Test STORAGE_WRITE_API specifically

TableSchema schema =
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("id").setType("INTEGER"),
new TableFieldSchema().setName("name").setType("STRING")));

// Create a write transform with CDC enabled using RowMutationInformation
BigQueryIO.Write<Row> write =
BigQueryIO.<Row>write()
.to("project-id:dataset-id.table-id")
.withSchema(schema)
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
.withRowMutationInformationFn(
(Row row) -> {
return RowMutationInformation.of(
RowMutationInformation.MutationType.UPSERT, row.getValue("id").toString());
})
.withTestServices(fakeBqServices)
.withoutValidation();

// Create test data with CDC-style updates
Schema beamSchema = Schema.builder().addInt32Field("id").addStringField("name").build();

List<Row> testData =
ImmutableList.of(
Row.withSchema(beamSchema).addValues(1, "Alice").build(),
Row.withSchema(beamSchema).addValues(2, "Bob").build(),
Row.withSchema(beamSchema).addValues(1, "Alice Updated").build() // Update row with id=1
);

// This should not throw an IllegalStateException
PCollection<Row> input = p.apply(Create.of(testData).withRowSchema(beamSchema));

WriteResult result = input.apply("WriteCdcToBQ", write);

p.run(); // Should complete successfully without IllegalStateException
}
}
Loading