Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.failureinjectiontesting;

import static com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.User.USERS_TABLE;
import static com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.User.USERS_TABLE_MYSQL_DDL;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;

import com.google.cloud.datastream.v1.Stream;
import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.DataflowFailureInjector;
import com.google.cloud.teleport.v2.spanner.testutils.failureinjectiontesting.FuzzyCDCLoadGenerator;
import com.google.cloud.teleport.v2.templates.DataStreamToSpanner;
import com.google.pubsub.v1.SubscriptionName;
import java.io.IOException;
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.conditions.ConditionCheck;
import org.apache.beam.it.gcp.cloudsql.CloudMySQLResourceManager;
import org.apache.beam.it.gcp.cloudsql.CloudSqlResourceManager;
import org.apache.beam.it.gcp.dataflow.FlexTemplateDataflowJobResourceManager;
import org.apache.beam.it.gcp.datastream.DatastreamResourceManager;
import org.apache.beam.it.gcp.datastream.JDBCSource;
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.spanner.conditions.SpannerRowsCheck;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
import org.apache.beam.it.jdbc.JDBCResourceManager;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/**
* Test for verifying correctness of CDC processing in {@link DataStreamToSpanner} DataStream to
* Spanner template.
*/
@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class})
@TemplateIntegrationTest(DataStreamToSpanner.class)
@RunWith(JUnit4.class)
public class DataStreamToSpannerCDCFT extends DataStreamToSpannerFTBase {

private static final String SPANNER_DDL_RESOURCE = "DataStreamToSpannerCDCFT/spanner-schema.sql";

private static final int NUM_WORKERS = 10;
private static final int MAX_WORKERS = 20;

private static PipelineLauncher.LaunchInfo jobInfo;
private static SpannerResourceManager spannerResourceManager;
private static SpannerResourceManager shadowTableSpannerResourceManager;
private static GcsResourceManager gcsResourceManager;
private static PubsubResourceManager pubsubResourceManager;

private static CloudSqlResourceManager sourceDBResourceManager;
private JDBCSource sourceConnectionProfile;
private FuzzyCDCLoadGenerator cdcLoadGenerator;

/**
* Setup resource managers and Launch dataflow job once during the execution of this test class.
*
* @throws IOException
*/
@Before
public void setUp() throws IOException, InterruptedException {
// create Spanner Resources
spannerResourceManager = createSpannerDatabase(SPANNER_DDL_RESOURCE);
shadowTableSpannerResourceManager =
SpannerResourceManager.builder("shadow-" + testName, PROJECT, REGION)
.maybeUseStaticInstance()
.build();
shadowTableSpannerResourceManager.ensureUsableAndCreateResources();
// create Source Resources
sourceDBResourceManager = CloudMySQLResourceManager.builder(testName).build();
sourceDBResourceManager.createTable(
USERS_TABLE, new JDBCResourceManager.JDBCSchema(USERS_TABLE_MYSQL_DDL, "id"));
sourceConnectionProfile =
createMySQLSourceConnectionProfile(sourceDBResourceManager, List.of(USERS_TABLE));

// create and upload GCS Resources
gcsResourceManager =
GcsResourceManager.builder(artifactBucketName, getClass().getSimpleName(), credentials)
.build();

// create pubsub manager
pubsubResourceManager = setUpPubSubResourceManager();

String testRootDir = getClass().getSimpleName();
// create subscriptions
String gcsPrefix =
String.join("/", new String[] {testRootDir, gcsResourceManager.runId(), testName, "cdc"});
SubscriptionName subscription =
createPubsubResources(
testRootDir + testName, pubsubResourceManager, gcsPrefix, gcsResourceManager);

String dlqGcsPrefix =
String.join("/", new String[] {testRootDir, gcsResourceManager.runId(), testName, "dlq"});
SubscriptionName dlqSubscription =
createPubsubResources(
testRootDir + testName + "dlq",
pubsubResourceManager,
dlqGcsPrefix,
gcsResourceManager);
String artifactBucket = TestProperties.artifactBucket();

// launch datastream
DatastreamResourceManager.Builder datastreamResourceManagerBuilder =
DatastreamResourceManager.builder(testName, PROJECT, REGION)
.setCredentialsProvider(credentialsProvider);
if (System.getProperty("privateConnectivity") != null) {
datastreamResourceManagerBuilder.setPrivateConnectivity(
System.getProperty("privateConnectivity"));
}
datastreamResourceManager = datastreamResourceManagerBuilder.build();
Stream stream =
createDataStreamResources(
artifactBucket, gcsPrefix, sourceConnectionProfile, datastreamResourceManager);

int numRows = 100;
int burstIterations = 10000;

// generate Load
cdcLoadGenerator = new FuzzyCDCLoadGenerator();
cdcLoadGenerator.generateLoad(numRows, burstIterations, 0.5, sourceDBResourceManager);

FlexTemplateDataflowJobResourceManager.Builder flexTemplateBuilder =
FlexTemplateDataflowJobResourceManager.builder(testName)
.addEnvironmentVariable("numWorkers", NUM_WORKERS)
.addEnvironmentVariable("maxWorkers", MAX_WORKERS);

// launch dataflow template
jobInfo =
launchFwdDataflowJob(
spannerResourceManager,
gcsPrefix,
stream.getName(),
dlqGcsPrefix,
subscription.toString(),
dlqSubscription.toString(),
flexTemplateBuilder,
shadowTableSpannerResourceManager);
}

/**
* Cleanup all the resources and resource managers.
*
* @throws IOException
*/
@After
public void cleanUp() throws IOException {
ResourceManagerUtils.cleanResources(
spannerResourceManager,
sourceDBResourceManager,
datastreamResourceManager,
gcsResourceManager,
pubsubResourceManager);
}

@Test
public void liveMigrationCrossDbTxnCdcTest()
throws IOException, InterruptedException, SQLException, ExecutionException {

// Wait for Forward migration job to be in running state
assertThatPipeline(jobInfo).isRunning();

// Wait for at least 1 row to appear in Spanner
ConditionCheck conditionCheck =
SpannerRowsCheck.builder(spannerResourceManager, USERS_TABLE).setMinRows(1).build();
PipelineOperator.Result result =
pipelineOperator()
.waitForCondition(createConfig(jobInfo, Duration.ofMinutes(20)), conditionCheck);
assertThatResult(result).meetsConditions();

// Kill the dataflow workers multiple times to induce work item assignment rebalancing and
// inturn increase the chance of same key being processed by multiple workers parallelly.
ConditionCheck workerFailureInjectorAsConditionCheck =
new ConditionCheck() {
@Override
protected @UnknownKeyFor @NonNull @Initialized String getDescription() {
return "Kill all workers for job " + jobInfo.jobId();
}

@Override
protected @UnknownKeyFor @NonNull @Initialized CheckResult check() {
try {
DataflowFailureInjector.abruptlyKillWorkers(jobInfo.projectId(), jobInfo.jobId());
} catch (Exception e) {
throw new RuntimeException(e);
}
return new CheckResult(true, "Killed all workers for job " + jobInfo.jobId());
}
};

long expectedRows = sourceDBResourceManager.getRowCount(USERS_TABLE);
// Wait for exact number of rows as source to appear in Spanner
ConditionCheck spannerRowCountConditionCheck =
SpannerRowsCheck.builder(spannerResourceManager, USERS_TABLE)
.setMinRows((int) expectedRows)
.setMaxRows((int) expectedRows)
.build();

// Implementing workerFailureInjector as condition check to rely on the Condition check
// framework to execute the check every 30 seconds until the condition is met. Combining
// workerFailureInjectorAsConditionCheck and spannerRowCountConditionCheck would mean that the
// kill dataflow worker function will be called until all the rows appear in spanner i.e., until
// the end of migration.
conditionCheck = workerFailureInjectorAsConditionCheck.and(spannerRowCountConditionCheck);

result =
pipelineOperator()
.waitForCondition(createConfig(jobInfo, Duration.ofMinutes(20)), conditionCheck);
assertThatResult(result).meetsConditions();

// Usually the dataflow finishes processing the events within 10 minutes. Giving 10 more minutes
// buffer for the dataflow job to process the events before asserting the results.
Thread.sleep(600000);

// Read data from Spanner and assert that it exactly matches with SourceDb
cdcLoadGenerator.assertRows(spannerResourceManager, sourceDBResourceManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ public PipelineLauncher.LaunchInfo launchFwdDataflowJob(
dlqGcsPrefix,
subscription.toString(),
dlqSubscription.toString(),
flexTemplateDataflowJobResourceManagerBuilder);
flexTemplateDataflowJobResourceManagerBuilder,
null);
assertThatPipeline(jobInfo).isRunning();
return jobInfo;
}
Expand All @@ -206,12 +207,13 @@ public PipelineLauncher.LaunchInfo launchFwdDataflowJob(
String dlqGcsPrefix,
String pubSubSubscription,
String dlqPubSubSubscription,
FlexTemplateDataflowJobResourceManager.Builder flexTemplateDataflowJobResourceManagerBuilder)
FlexTemplateDataflowJobResourceManager.Builder flexTemplateDataflowJobResourceManagerBuilder,
SpannerResourceManager shadowTableSpannerResourceManager)
throws IOException {
String artifactBucket = TestProperties.artifactBucket();

// launch dataflow template
FlexTemplateDataflowJobResourceManager flexTemplateDataflowJobResourceManager =
FlexTemplateDataflowJobResourceManager.Builder flexTemplateBuilder =
flexTemplateDataflowJobResourceManagerBuilder
.withTemplateName("Cloud_Datastream_to_Spanner")
.withTemplateModulePath("v2/datastream-to-spanner")
Expand All @@ -224,11 +226,17 @@ public PipelineLauncher.LaunchInfo launchFwdDataflowJob(
.addParameter("gcsPubSubSubscription", pubSubSubscription)
.addParameter("dlqGcsPubSubSubscription", dlqPubSubSubscription)
.addParameter("datastreamSourceType", "mysql")
.addParameter("inputFileFormat", "avro")
.build();
.addParameter("inputFileFormat", "avro");

if (shadowTableSpannerResourceManager != null) {
flexTemplateBuilder.addParameter(
"shadowTableSpannerInstanceId", shadowTableSpannerResourceManager.getInstanceId());
flexTemplateBuilder.addParameter(
"shadowTableSpannerDatabaseId", shadowTableSpannerResourceManager.getDatabaseId());
}

// Run
PipelineLauncher.LaunchInfo jobInfo = flexTemplateDataflowJobResourceManager.launchJob();
PipelineLauncher.LaunchInfo jobInfo = flexTemplateBuilder.build().launchJob();
return jobInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ public void pubsubInvalidSubscriptionFITest() throws IOException, InterruptedExc
"dlqGcsPrefix",
subscriptionName,
subscriptionName,
flexTemplateBuilder);
flexTemplateBuilder,
null);
fail("Expected launch job to fail but it succeeded");
} catch (RuntimeException e) {
String jobId = extractJobIdFromError(e.getMessage());
Expand Down Expand Up @@ -211,7 +212,8 @@ public void pubsubDuplicateMessageDeliveryFITest() throws IOException, Interrupt
dlqGcsPrefix,
subscription.toString(),
dlqSubscription.toString(),
flexTemplateBuilder);
flexTemplateBuilder,
null);
assertThatPipeline(jobInfo).isRunning();

MySQLSrcDataProvider.writeRowsInSourceDB(1, 100, sourceDBResourceManager);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE `Users` (
id INT64 NOT NULL,
first_name STRING(200),
last_name STRING(200),
age INT64,
status BOOL,
col1 INT64,
col2 INT64,
) PRIMARY KEY(id);
Loading
Loading