Skip to content

Commit c3f3951

Browse files
craig[bot]HonoreDBXiang-Gumsbutler
committed
107572: changefeedccl: warn when using https sink urls r=[miretskiy] a=HonoreDB Changefeeds previously used http as their URI to align with backups. Backups no longer support http, and for changefeeds http is ambiguous, leading users to create cloudstorage sinks when they meant webhook sinks. This PR changes the scheme to file-http. Webhooks still use webhook-http. Fixes cockroachdb#98719. Release note (enterprise change): Changefeeds that create files over an http connection may now be specified via `INTO 'file-https://'` to disambiguate with `webhook-https`. 107993: sql: Deflake TestTxnObeysTableModificationTime r=Xiang-Gu a=Xiang-Gu This commit refactors and simplifies this test, and a local execution with `--race --stress` succeeded for at least 10 minutes. Fixes cockroachdb#107736 Fixes cockroachdb#107159 Epic: None Release note: None 108013: c2c: reduce TestStreamingAutoReplan runtime r=stevendanna a=msbutler In a suprising twist, cockroachdb#106853 deflaked this test, but also increased the runtime to 10 minutes. To understand why, consider that the default value of the stream_replication.replan_flow_frequency setting is 10 minutes, and if the user changes the cluster setting, the frequency will only take effect on the next replanning check. So, if a stream begins at t0, and the user changes the setting at t1, the replanning frequency will actually change 10 minutes after t0. In cockroachdb#105853 patch reduced stream_replication.replan_flow_frequency setting to 500 ms _after_ the stream began, which caused the next replanninng check to occur a full 10 minutes after the stream started! This patch changes this cluster setting _before_ the stream begins, reducing the runtime of this test to a speedy 10 seconds. Epic: none Release note: none Co-authored-by: Aaron Zinger <[email protected]> Co-authored-by: Xiang Gu <[email protected]> Co-authored-by: Michael Butler <[email protected]>
4 parents 9355e10 + a428c45 + 9b74331 + 602595e commit c3f3951

File tree

10 files changed

+117
-204
lines changed

10 files changed

+117
-204
lines changed

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -904,6 +904,25 @@ func validateSink(
904904
if err != nil {
905905
return err
906906
}
907+
u, err := url.Parse(details.SinkURI)
908+
if err != nil {
909+
return err
910+
}
911+
912+
ambiguousSchemes := map[string][2]string{
913+
changefeedbase.DeprecatedSinkSchemeHTTP: {changefeedbase.SinkSchemeCloudStorageHTTP, changefeedbase.SinkSchemeWebhookHTTP},
914+
changefeedbase.DeprecatedSinkSchemeHTTPS: {changefeedbase.SinkSchemeCloudStorageHTTPS, changefeedbase.SinkSchemeWebhookHTTPS},
915+
}
916+
917+
if disambiguations, isAmbiguous := ambiguousSchemes[u.Scheme]; isAmbiguous {
918+
p.BufferClientNotice(ctx, pgnotice.Newf(
919+
`Interpreting deprecated URI scheme %s as %s. For webhook semantics, use %s.`,
920+
u.Scheme,
921+
disambiguations[0],
922+
disambiguations[1],
923+
))
924+
}
925+
907926
var nilOracle timestampLowerBoundOracle
908927
canarySink, err := getAndDialSink(ctx, &p.ExecCfg().DistSQLSrv.ServerConfig, details,
909928
nilOracle, p.User(), jobID, sli)

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4828,7 +4828,7 @@ func TestChangefeedErrors(t *testing.T) {
48284828
`CREATE CHANGEFEED FOR foo INTO $1`, `webhook-https://fake-host?ca_cert=Zm9v`,
48294829
)
48304830
sqlDB.ExpectErr(
4831-
t, `sink requires https`,
4831+
t, `sink requires webhook-https`,
48324832
`CREATE CHANGEFEED FOR foo INTO $1`, `webhook-http://fake-host`,
48334833
)
48344834
sqlDB.ExpectErr(

pkg/ccl/changefeedccl/changefeedbase/options.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,11 @@ const (
162162
DeprecatedSinkSchemeCloudStorageNodelocal = `experimental-nodelocal`
163163
DeprecatedSinkSchemeCloudStorageS3 = `experimental-s3`
164164

165+
// DeprecatedSinkSchemeHTTP is interpreted as cloudstorage over HTTP PUT.
166+
DeprecatedSinkSchemeHTTP = `http`
167+
// DeprecatedSinkSchemeHTTPS is interpreted as cloudstorage over HTTPS PUT.
168+
DeprecatedSinkSchemeHTTPS = `https`
169+
165170
// OptKafkaSinkConfig is a JSON configuration for kafka sink (kafkaSinkConfig).
166171
OptKafkaSinkConfig = `kafka_sink_config`
167172
OptPubsubSinkConfig = `pubsub_sink_config`
@@ -186,13 +191,11 @@ const (
186191
SinkParamTopicName = `topic_name`
187192
SinkSchemeCloudStorageAzure = `azure`
188193
SinkSchemeCloudStorageGCS = `gs`
189-
SinkSchemeCloudStorageHTTP = `http`
190-
SinkSchemeCloudStorageHTTPS = `https`
194+
SinkSchemeCloudStorageHTTP = `file-http`
195+
SinkSchemeCloudStorageHTTPS = `file-https`
191196
SinkSchemeCloudStorageNodelocal = `nodelocal`
192197
SinkSchemeCloudStorageS3 = `s3`
193198
SinkSchemeExperimentalSQL = `experimental-sql`
194-
SinkSchemeHTTP = `http`
195-
SinkSchemeHTTPS = `https`
196199
SinkSchemeKafka = `kafka`
197200
SinkSchemeNull = `null`
198201
SinkSchemeWebhookHTTP = `webhook-http`

pkg/ccl/changefeedccl/sink_cloudstorage.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ func isCloudStorageSink(u *url.URL) bool {
4949
changefeedbase.SinkSchemeCloudStorageNodelocal, changefeedbase.SinkSchemeCloudStorageHTTP,
5050
changefeedbase.SinkSchemeCloudStorageHTTPS, changefeedbase.SinkSchemeCloudStorageAzure:
5151
return true
52+
// During the deprecation period, we need to keep parsing these as cloudstorage for backwards
53+
// compatibility. Afterwards we'll either remove them or move them to webhook.
54+
case changefeedbase.DeprecatedSinkSchemeHTTP, changefeedbase.DeprecatedSinkSchemeHTTPS:
55+
return true
5256
default:
5357
return false
5458
}
@@ -375,6 +379,7 @@ func makeCloudStorageSink(
375379
}
376380
}
377381
u.Scheme = strings.TrimPrefix(u.Scheme, `experimental-`)
382+
u.Scheme = strings.TrimPrefix(u.Scheme, `file-`)
378383

379384
sinkID := atomic.AddInt64(&cloudStorageSinkIDAtomic, 1)
380385
sessID, err := generateChangefeedSessionID()

pkg/ccl/changefeedccl/sink_external_connection.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ var supportedExternalConnectionTypes = map[string]connectionpb.ConnectionProvide
9292
GcpScheme: connectionpb.ConnectionProvider_gcpubsub,
9393
changefeedbase.SinkSchemeCloudStorageHTTP: connectionpb.ConnectionProvider_http,
9494
changefeedbase.SinkSchemeCloudStorageHTTPS: connectionpb.ConnectionProvider_https,
95+
changefeedbase.DeprecatedSinkSchemeHTTP: connectionpb.ConnectionProvider_http,
96+
changefeedbase.DeprecatedSinkSchemeHTTPS: connectionpb.ConnectionProvider_https,
9597
changefeedbase.SinkSchemeCloudStorageNodelocal: connectionpb.ConnectionProvider_nodelocal,
9698
changefeedbase.SinkSchemeCloudStorageS3: connectionpb.ConnectionProvider_s3,
9799
changefeedbase.SinkSchemeKafka: connectionpb.ConnectionProvider_kafka,

pkg/ccl/changefeedccl/sink_webhook.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ func makeDeprecatedWebhookSink(
233233
mb metricsRecorderBuilder,
234234
) (Sink, error) {
235235
if u.Scheme != changefeedbase.SinkSchemeWebhookHTTPS {
236-
return nil, errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeHTTPS)
236+
return nil, errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeWebhookHTTPS)
237237
}
238238
u.Scheme = strings.TrimPrefix(u.Scheme, `webhook-`)
239239

pkg/ccl/changefeedccl/sink_webhook_v2.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func validateWebhookOpts(
229229
u sinkURL, encodingOpts changefeedbase.EncodingOptions, opts changefeedbase.WebhookSinkOptions,
230230
) error {
231231
if u.Scheme != changefeedbase.SinkSchemeWebhookHTTPS {
232-
return errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeHTTPS)
232+
return errors.Errorf(`this sink requires %s`, changefeedbase.SinkSchemeWebhookHTTPS)
233233
}
234234

235235
switch encodingOpts.Format {

pkg/ccl/streamingccl/streamingest/replication_stream_e2e_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,7 @@ func TestStreamingAutoReplan(t *testing.T) {
729729
defer cleanup()
730730
// Don't allow for replanning until the new nodes and scattered table have been created.
731731
serverutils.SetClusterSetting(t, c.DestCluster, "stream_replication.replan_flow_threshold", 0)
732+
serverutils.SetClusterSetting(t, c.DestCluster, "stream_replication.replan_flow_frequency", time.Millisecond*500)
732733

733734
// Begin the job on a single source node.
734735
producerJobID, ingestionJobID := c.StartStreamReplication(ctx)
@@ -746,7 +747,6 @@ func TestStreamingAutoReplan(t *testing.T) {
746747

747748
// Configure the ingestion job to replan eagerly.
748749
serverutils.SetClusterSetting(t, c.DestCluster, "stream_replication.replan_flow_threshold", 0.1)
749-
serverutils.SetClusterSetting(t, c.DestCluster, "stream_replication.replan_flow_frequency", time.Millisecond*500)
750750

751751
// The ingestion job should eventually retry because it detects new nodes to add to the plan.
752752
require.Error(t, <-retryErrorChan, sql.ErrPlanChanged)

pkg/sql/catalog/lease/BUILD.bazel

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ go_test(
7777
embed = [":lease"],
7878
deps = [
7979
"//pkg/base",
80+
"//pkg/jobs",
8081
"//pkg/jobs/jobspb",
8182
"//pkg/keys",
8283
"//pkg/kv",
@@ -105,7 +106,6 @@ go_test(
105106
"//pkg/sql/rowenc/keyside",
106107
"//pkg/sql/sem/tree",
107108
"//pkg/sql/sessiondata",
108-
"//pkg/sql/sqltestutils",
109109
"//pkg/sql/tests",
110110
"//pkg/sql/types",
111111
"//pkg/storage",
@@ -114,7 +114,6 @@ go_test(
114114
"//pkg/testutils/skip",
115115
"//pkg/testutils/sqlutils",
116116
"//pkg/testutils/testcluster",
117-
"//pkg/util",
118117
"//pkg/util/admission",
119118
"//pkg/util/encoding",
120119
"//pkg/util/hlc",

0 commit comments

Comments
 (0)