From 354a3601ff5b82f465beaf392e4ea36d0a3b75dc Mon Sep 17 00:00:00 2001 From: wuzhuanhong Date: Wed, 19 Nov 2025 17:17:04 +0800 Subject: [PATCH 1/2] chore(kafka): use new API to replace history API and improve test --- go.mod | 2 +- go.sum | 4 +- ...rce_huaweicloud_dms_kafka_instance_test.go | 115 +++++++++++++----- ...resource_huaweicloud_dms_kafka_instance.go | 10 +- .../dms/v2/kafka/instances/requests.go | 37 ++++++ .../openstack/dms/v2/kafka/instances/urls.go | 8 ++ vendor/modules.txt | 2 +- 7 files changed, 136 insertions(+), 42 deletions(-) diff --git a/go.mod b/go.mod index 84e9782dbf6..7cb83807f48 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.18 require ( github.com/GehirnInc/crypt v0.0.0-20200316065508-bb7000b8a962 - github.com/chnsz/golangsdk v0.0.0-20251119014205-0fd4ae2d727b + github.com/chnsz/golangsdk v0.0.0-20251121031605-47043ab44ff7 github.com/google/go-cmp v0.6.0 github.com/hashicorp/go-cleanhttp v0.5.2 github.com/hashicorp/go-cty v1.4.1-0.20200414143053-d3edf31b6320 diff --git a/go.sum b/go.sum index b190b895c24..5fa06db8571 100644 --- a/go.sum +++ b/go.sum @@ -22,8 +22,8 @@ github.com/apparentlymart/go-textseg/v13 v13.0.0 h1:Y+KvPE1NYz0xl601PVImeQfFyEy6 github.com/apparentlymart/go-textseg/v13 v13.0.0/go.mod h1:ZK2fH7c4NqDTLtiYLvIkEghdlcqw7yxLeM89kiTRPUo= github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/chnsz/golangsdk v0.0.0-20251119014205-0fd4ae2d727b h1:+5U4xfrnMRUQkteDBSClO+YHKa14xX+Fw9zHw/8+h7A= -github.com/chnsz/golangsdk v0.0.0-20251119014205-0fd4ae2d727b/go.mod h1:PIan4aDeDoNOI4FImVS8osVpShcVnAUKR2XM8Ulgsgw= +github.com/chnsz/golangsdk v0.0.0-20251121031605-47043ab44ff7 h1:CFHtM054TXBC5tBBKEAK+MAVCfsn/BtLFwQG0tJwxkM= +github.com/chnsz/golangsdk v0.0.0-20251121031605-47043ab44ff7/go.mod h1:PIan4aDeDoNOI4FImVS8osVpShcVnAUKR2XM8Ulgsgw= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= diff --git a/huaweicloud/services/acceptance/kafka/resource_huaweicloud_dms_kafka_instance_test.go b/huaweicloud/services/acceptance/kafka/resource_huaweicloud_dms_kafka_instance_test.go index b778d980fa4..e74352fbe2a 100644 --- a/huaweicloud/services/acceptance/kafka/resource_huaweicloud_dms_kafka_instance_test.go +++ b/huaweicloud/services/acceptance/kafka/resource_huaweicloud_dms_kafka_instance_test.go @@ -107,24 +107,42 @@ func TestAccKafkaInstance_newFormat(t *testing.T) { Check: resource.ComposeTestCheckFunc( rc.CheckResourceExists(), resource.TestCheckResourceAttr(resourceName, "name", rName), - resource.TestCheckResourceAttr(resourceName, "engine", "kafka"), - resource.TestCheckResourceAttr(resourceName, "security_protocol", "SASL_PLAINTEXT"), - resource.TestCheckResourceAttr(resourceName, "enabled_mechanisms.0", "SCRAM-SHA-512"), - resource.TestCheckResourceAttrPair(resourceName, "broker_num", - "data.huaweicloud_dms_kafka_flavors.test", "flavors.0.properties.0.min_broker"), - resource.TestCheckResourceAttrPair(resourceName, "flavor_id", - "data.huaweicloud_dms_kafka_flavors.test", "flavors.0.id"), + resource.TestCheckResourceAttr(resourceName, "engine_version", "2.7"), resource.TestCheckResourceAttrPair(resourceName, "storage_spec_code", "data.huaweicloud_dms_kafka_flavors.test", "flavors.0.ios.0.storage_spec_code"), - resource.TestCheckResourceAttr(resourceName, "broker_num", "3"), + resource.TestCheckResourceAttrPair(resourceName, "vpc_id", + "huaweicloud_vpc.test", "id"), + resource.TestCheckResourceAttrPair(resourceName, "security_group_id", + "huaweicloud_networking_secgroup.test", "id"), + resource.TestCheckResourceAttrPair(resourceName, "network_id", + "huaweicloud_vpc_subnet.test", "id"), + resource.TestMatchResourceAttr(resourceName, "availability_zones.#", regexp.MustCompile(`^[1-9]([0-9]*)?$`)), resource.TestCheckResourceAttr(resourceName, "arch_type", "X86"), - - resource.TestCheckResourceAttr(resourceName, "cross_vpc_accesses.1.advertised_ip", "www.terraform-test.com"), - resource.TestCheckResourceAttr(resourceName, "cross_vpc_accesses.2.advertised_ip", "192.168.0.53"), + resource.TestCheckResourceAttrPair(resourceName, "flavor_id", + "data.huaweicloud_dms_kafka_flavors.test", "flavors.0.id"), + resource.TestCheckResourceAttr(resourceName, "broker_num", "3"), + resource.TestCheckResourceAttr(resourceName, "access_user", "user"), resource.TestCheckResourceAttr(resourceName, "parameters.0.name", "log.retention.hours"), resource.TestCheckResourceAttr(resourceName, "parameters.0.value", "48"), + resource.TestCheckResourceAttr(resourceName, "security_protocol", "SASL_PLAINTEXT"), + resource.TestCheckResourceAttr(resourceName, "enabled_mechanisms.0", "SCRAM-SHA-512"), + resource.TestCheckResourceAttr(resourceName, "cross_vpc_accesses.1.advertised_ip", "www.terraform-test.com"), + resource.TestCheckResourceAttr(resourceName, "cross_vpc_accesses.2.advertised_ip", "192.168.0.53"), + resource.TestCheckResourceAttrSet(resourceName, "storage_space"), resource.TestCheckResourceAttrSet(resourceName, "maintain_begin"), resource.TestCheckResourceAttrSet(resourceName, "maintain_end"), + // Check attributes. + resource.TestCheckResourceAttr(resourceName, "engine", "kafka"), + resource.TestMatchResourceAttr(resourceName, "partition_num", regexp.MustCompile(`^[1-9]([0-9]*)?$`)), + resource.TestCheckResourceAttrSet(resourceName, "connect_address"), + resource.TestCheckResourceAttrSet(resourceName, "port"), + resource.TestCheckResourceAttrSet(resourceName, "status"), + resource.TestCheckResourceAttrSet(resourceName, "storage_type"), + resource.TestCheckResourceAttrSet(resourceName, "created_at"), + resource.TestCheckResourceAttr(resourceName, "is_logical_volume", "true"), + resource.TestCheckResourceAttrSet(resourceName, "node_num"), + resource.TestCheckResourceAttrSet(resourceName, "pod_connect_address"), + resource.TestCheckResourceAttr(resourceName, "type", "cluster"), ), }, { @@ -139,13 +157,12 @@ func TestAccKafkaInstance_newFormat(t *testing.T) { resource.TestCheckResourceAttrPair(resourceName, "storage_spec_code", "data.huaweicloud_dms_kafka_flavors.test", "flavors.0.ios.0.storage_spec_code"), resource.TestCheckResourceAttr(resourceName, "storage_space", "600"), - + resource.TestCheckResourceAttr(resourceName, "parameters.0.name", "auto.create.groups.enable"), + resource.TestCheckResourceAttr(resourceName, "parameters.0.value", "false"), resource.TestCheckResourceAttr(resourceName, "cross_vpc_accesses.0.advertised_ip", "192.168.0.61"), resource.TestCheckResourceAttr(resourceName, "cross_vpc_accesses.1.advertised_ip", "test.terraform.com"), resource.TestCheckResourceAttr(resourceName, "cross_vpc_accesses.2.advertised_ip", "192.168.0.62"), resource.TestCheckResourceAttr(resourceName, "cross_vpc_accesses.3.advertised_ip", "192.168.0.63"), - resource.TestCheckResourceAttr(resourceName, "parameters.0.name", "auto.create.groups.enable"), - resource.TestCheckResourceAttr(resourceName, "parameters.0.value", "false"), resource.TestCheckResourceAttr(resourceName, "maintain_begin", "06:00:00"), resource.TestCheckResourceAttr(resourceName, "maintain_end", "10:00:00"), ), @@ -166,7 +183,10 @@ func TestAccKafkaInstance_publicIp(t *testing.T) { password := acceptance.RandomPassword() resource.ParallelTest(t, resource.TestCase{ - PreCheck: func() { acceptance.TestAccPreCheck(t) }, + PreCheck: func() { + acceptance.TestAccPreCheck(t) + acceptance.TestAccPreCheckMigrateEpsID(t) + }, ProviderFactories: acceptance.TestAccProviderFactories, CheckDestroy: rc.CheckResourceDestroy(), Steps: []resource.TestStep{ @@ -177,6 +197,10 @@ func TestAccKafkaInstance_publicIp(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "name", rName), resource.TestCheckResourceAttr(resourceName, "broker_num", "3"), resource.TestCheckResourceAttr(resourceName, "public_ip_ids.#", "3"), + resource.TestCheckResourceAttr(resourceName, "dumping", "true"), + resource.TestCheckResourceAttr(resourceName, "new_tenant_ips.#", "2"), + resource.TestCheckResourceAttr(resourceName, "new_tenant_ips.0", "192.168.0.20"), + resource.TestCheckResourceAttr(resourceName, "new_tenant_ips.1", "192.168.0.18"), resource.TestCheckResourceAttr(resourceName, "ssl_enable", "false"), resource.TestCheckResourceAttr(resourceName, "port_protocol.0.private_plain_enable", "true"), resource.TestCheckResourceAttr(resourceName, "port_protocol.0.public_plain_enable", "true"), @@ -184,6 +208,16 @@ func TestAccKafkaInstance_publicIp(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "port_protocol.0.public_sasl_ssl_enable", "false"), resource.TestCheckResourceAttr(resourceName, "port_protocol.0.private_sasl_plaintext_enable", "false"), resource.TestCheckResourceAttr(resourceName, "port_protocol.0.public_sasl_plaintext_enable", "false"), + resource.TestCheckResourceAttr(resourceName, "enable_auto_topic", "true"), + resource.TestCheckResourceAttr(resourceName, "vpc_client_plain", "true"), + resource.TestCheckResourceAttr(resourceName, "description", "Created by Terraform script"), + resource.TestCheckResourceAttr(resourceName, "enterprise_project_id", acceptance.HW_ENTERPRISE_PROJECT_ID_TEST), + resource.TestCheckResourceAttr(resourceName, "retention_policy", "time_base"), + // Check attributes. + resource.TestCheckResourceAttr(resourceName, "enable_public_ip", "true"), + resource.TestCheckResourceAttr(resourceName, "public_ip_address.#", "3"), + resource.TestCheckResourceAttrSet(resourceName, "connector_id"), + resource.TestCheckResourceAttrSet(resourceName, "connector_node_num"), ), }, { @@ -198,11 +232,17 @@ func TestAccKafkaInstance_publicIp(t *testing.T) { resource.TestCheckResourceAttr(resourceName, "name", rName), resource.TestCheckResourceAttr(resourceName, "broker_num", "4"), resource.TestCheckResourceAttr(resourceName, "public_ip_ids.#", "4"), + resource.TestCheckResourceAttr(resourceName, "new_tenant_ips.#", "1"), + resource.TestCheckResourceAttr(resourceName, "new_tenant_ips.0", "192.168.0.79"), resource.TestCheckResourceAttr(resourceName, "ssl_enable", "true"), resource.TestCheckResourceAttr(resourceName, "port_protocol.0.private_plain_enable", "false"), resource.TestCheckResourceAttr(resourceName, "port_protocol.0.private_sasl_ssl_enable", "true"), resource.TestCheckResourceAttr(resourceName, "port_protocol.0.public_plain_enable", "false"), resource.TestCheckResourceAttr(resourceName, "port_protocol.0.public_sasl_ssl_enable", "true"), + resource.TestCheckResourceAttr(resourceName, "enable_auto_topic", "false"), + resource.TestCheckResourceAttr(resourceName, "description", ""), + resource.TestCheckResourceAttr(resourceName, "enterprise_project_id", acceptance.HW_ENTERPRISE_MIGRATE_PROJECT_ID_TEST), + resource.TestCheckResourceAttr(resourceName, "retention_policy", "produce_reject"), ), }, { @@ -479,18 +519,24 @@ resource "huaweicloud_dms_kafka_instance" "test" { storage_spec_code = local.flavor.ios[0].storage_spec_code availability_zones = slice(data.huaweicloud_availability_zones.test.names, 0, 3) - engine_version = "2.7" - storage_space = 300 - broker_num = 3 - arch_type = "X86" - public_ip_ids = huaweicloud_vpc_eip.test[*].id - new_tenant_ips = ["192.168.0.20", "192.168.0.18"] + engine_version = "2.7" + storage_space = 300 + broker_num = 3 + arch_type = "X86" + public_ip_ids = huaweicloud_vpc_eip.test[*].id + new_tenant_ips = ["192.168.0.20", "192.168.0.18"] + dumping = true + description = "Created by Terraform script" + enable_auto_topic = true + vpc_client_plain = true + enterprise_project_id = "%[4]s" + retention_policy = "time_base" port_protocol { private_plain_enable = true public_plain_enable = true } -}`, common.TestBaseNetwork(rName), testAccKafkaInstance_publicIpBase(3), rName) +}`, common.TestBaseNetwork(rName), testAccKafkaInstance_publicIpBase(3), rName, acceptance.HW_ENTERPRISE_PROJECT_ID_TEST) } func testAccKafkaInstance_publicIp_update(rName, password string, brokerNum int) string { @@ -520,16 +566,19 @@ resource "huaweicloud_dms_kafka_instance" "test" { storage_spec_code = local.flavor.ios[0].storage_spec_code availability_zones = slice(data.huaweicloud_availability_zones.test.names, 0, 3) - engine_version = "2.7" - storage_space = 600 - broker_num = %d - arch_type = "X86" - new_tenant_ips = ["192.168.0.79"] - public_ip_ids = huaweicloud_vpc_eip.test[*].id - access_user = "test" - password = "%[5]s" - - enabled_mechanisms = ["SCRAM-SHA-512"] + engine_version = "2.7" + storage_space = 600 + broker_num = %d + arch_type = "X86" + new_tenant_ips = ["192.168.0.79"] + dumping = true + public_ip_ids = huaweicloud_vpc_eip.test[*].id + access_user = "test" + password = "%[5]s" + enabled_mechanisms = ["SCRAM-SHA-512"] + enable_auto_topic = false + enterprise_project_id = "%[6]s" + retention_policy = "produce_reject" port_protocol { private_plain_enable = false @@ -537,5 +586,5 @@ resource "huaweicloud_dms_kafka_instance" "test" { public_plain_enable = false public_sasl_ssl_enable = true } -}`, common.TestBaseNetwork(rName), testAccKafkaInstance_publicIpBase(4), rName, brokerNum, password) +}`, common.TestBaseNetwork(rName), testAccKafkaInstance_publicIpBase(4), rName, brokerNum, password, acceptance.HW_ENTERPRISE_MIGRATE_PROJECT_ID_TEST) } diff --git a/huaweicloud/services/kafka/resource_huaweicloud_dms_kafka_instance.go b/huaweicloud/services/kafka/resource_huaweicloud_dms_kafka_instance.go index eeafa820dd4..15f880d8d12 100644 --- a/huaweicloud/services/kafka/resource_huaweicloud_dms_kafka_instance.go +++ b/huaweicloud/services/kafka/resource_huaweicloud_dms_kafka_instance.go @@ -36,11 +36,11 @@ const engineKafka = "kafka" // @API Kafka GET /v2/available-zones // @API Kafka POST /v2/{project_id}/instances/{instance_id}/crossvpc/modify -// @API Kafka POST /v2/{project_id}/instances/{instance_id}/extend +// @API Kafka POST /v2/{project_id}/kafka/instances/{instance_id}/extend // @API Kafka DELETE /v2/{project_id}/instances/{instance_id} // @API Kafka GET /v2/{project_id}/instances/{instance_id} // @API Kafka PUT /v2/{project_id}/instances/{instance_id} -// @API Kafka POST /v2/{engine}/{project_id}/instances +// @API Kafka POST /v2/{project_id}/kafka/instances // @API Kafka GET /v2/{project_id}/kafka/{instance_id}/tags // @API Kafka POST /v2/{project_id}/kafka/{instance_id}/tags/action // @API Kafka POST /v2/{project_id}/instances/{instance_id}/autotopic @@ -919,7 +919,7 @@ func createKafkaInstanceWithFlavor(ctx context.Context, d *schema.ResourceData, createOpts.Password = password createOpts.KafkaManagerPassword = d.Get("manager_password").(string) - kafkaInstance, err := instances.CreateWithEngine(client, createOpts, engineKafka).Extract() + kafkaInstance, err := instances.CreateInstance(client, createOpts).Extract() if err != nil { return diag.Errorf("error creating Kafka instance: %s", err) } @@ -1063,7 +1063,7 @@ func createKafkaInstanceWithProductID(ctx context.Context, d *schema.ResourceDat createOpts.Password = password createOpts.KafkaManagerPassword = d.Get("manager_password").(string) - kafkaInstance, err := instances.CreateWithEngine(client, createOpts, engineKafka).Extract() + kafkaInstance, err := instances.CreateInstance(client, createOpts).Extract() if err != nil { return diag.Errorf("error creating DMS kafka instance: %s", err) } @@ -1752,7 +1752,7 @@ func resizeKafkaInstanceStorage(ctx context.Context, d *schema.ResourceData, cli func doKafkaInstanceResize(ctx context.Context, d *schema.ResourceData, client *golangsdk.ServiceClient, opts instances.ResizeInstanceOpts) error { retryFunc := func() (interface{}, bool, error) { - _, err := instances.Resize(client, d.Id(), opts) + _, err := instances.ExtendInstance(client, d.Id(), opts) retry, err := handleMultiOperationsError(err) return nil, retry, err } diff --git a/vendor/github.com/chnsz/golangsdk/openstack/dms/v2/kafka/instances/requests.go b/vendor/github.com/chnsz/golangsdk/openstack/dms/v2/kafka/instances/requests.go index 5a33bae2e99..4f7c6be112c 100644 --- a/vendor/github.com/chnsz/golangsdk/openstack/dms/v2/kafka/instances/requests.go +++ b/vendor/github.com/chnsz/golangsdk/openstack/dms/v2/kafka/instances/requests.go @@ -242,6 +242,20 @@ func CreateWithEngine(client *golangsdk.ServiceClient, ops CreateOpsBuilder, eng return } +func CreateInstance(client *golangsdk.ServiceClient, ops CreateOpsBuilder) (r CreateResult) { + b, err := ops.ToInstanceCreateMap() + if err != nil { + r.Err = err + return + } + + _, r.Err = client.Post(createInstanceURL(client), b, &r.Body, &golangsdk.RequestOpts{ + OkCodes: []int{200}, + }) + + return +} + // Delete an instance by id func Delete(client *golangsdk.ServiceClient, id string) (r DeleteResult) { _, r.Err = client.Delete(deleteURL(client, id), &golangsdk.RequestOpts{ @@ -386,6 +400,29 @@ func Resize(client *golangsdk.ServiceClient, id string, opts ResizeInstanceOpts) return "", err } +func ExtendInstance(client *golangsdk.ServiceClient, instanceId string, opts ResizeInstanceOpts) (string, error) { + b, err := golangsdk.BuildRequestBody(opts, "") + if err != nil { + return "", err + } + + var rst golangsdk.Result + _, err = client.Post(extendInstanceURL(client, instanceId), b, &rst.Body, &golangsdk.RequestOpts{ + MoreHeaders: requestOpts.MoreHeaders, + }) + + if err == nil { + var r struct { + JobID string `json:"job_id"` + } + if err = rst.ExtractInto(&r); err != nil { + return "", err + } + return r.JobID, nil + } + return "", err +} + // CrossVpcUpdateOpts is the structure required by the UpdateCrossVpc method to update the internal IP address for // cross-VPC access. type CrossVpcUpdateOpts struct { diff --git a/vendor/github.com/chnsz/golangsdk/openstack/dms/v2/kafka/instances/urls.go b/vendor/github.com/chnsz/golangsdk/openstack/dms/v2/kafka/instances/urls.go index 6bcf5d89cec..d9a7c515846 100644 --- a/vendor/github.com/chnsz/golangsdk/openstack/dms/v2/kafka/instances/urls.go +++ b/vendor/github.com/chnsz/golangsdk/openstack/dms/v2/kafka/instances/urls.go @@ -14,6 +14,10 @@ func createURLWithEngine(engine string, client *golangsdk.ServiceClient) string return client.ServiceURL(engine, client.ProjectID, resourcePath) } +func createInstanceURL(client *golangsdk.ServiceClient) string { + return client.ServiceURL(client.ProjectID, "kafka", resourcePath) +} + // deleteURL will build the url of deletion func deleteURL(client *golangsdk.ServiceClient, id string) string { return client.ServiceURL(client.ProjectID, resourcePath, id) @@ -37,6 +41,10 @@ func extend(client *golangsdk.ServiceClient, id string) string { return client.ServiceURL(client.ProjectID, resourcePath, id, "extend") } +func extendInstanceURL(client *golangsdk.ServiceClient, instanceId string) string { + return client.ServiceURL(client.ProjectID, "kafka", resourcePath, instanceId, "extend") +} + func crossVpcURL(client *golangsdk.ServiceClient, id string) string { return client.ServiceURL(client.ProjectID, resourcePath, id, "crossvpc/modify") } diff --git a/vendor/modules.txt b/vendor/modules.txt index 323ba58bdbb..e36bf01c080 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -13,7 +13,7 @@ github.com/apparentlymart/go-cidr/cidr # github.com/apparentlymart/go-textseg/v13 v13.0.0 ## explicit; go 1.16 github.com/apparentlymart/go-textseg/v13/textseg -# github.com/chnsz/golangsdk v0.0.0-20251119014205-0fd4ae2d727b +# github.com/chnsz/golangsdk v0.0.0-20251121031605-47043ab44ff7 ## explicit; go 1.14 github.com/chnsz/golangsdk github.com/chnsz/golangsdk/auth From 7bc92f8857aab1e190b397daf97dc9fef5424a5e Mon Sep 17 00:00:00 2001 From: wuzhuanhong Date: Mon, 24 Nov 2025 15:05:42 +0800 Subject: [PATCH 2/2] fix(kafka/instance): fix an error when updating storage_space --- .../resource_huaweicloud_dms_kafka_instance.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/huaweicloud/services/kafka/resource_huaweicloud_dms_kafka_instance.go b/huaweicloud/services/kafka/resource_huaweicloud_dms_kafka_instance.go index 15f880d8d12..733ed360cac 100644 --- a/huaweicloud/services/kafka/resource_huaweicloud_dms_kafka_instance.go +++ b/huaweicloud/services/kafka/resource_huaweicloud_dms_kafka_instance.go @@ -1729,7 +1729,21 @@ func resizeKafkaInstance(ctx context.Context, d *schema.ResourceData, meta inter } } + // `storage_space` equals `broker_num` multiplied by the storage space of each broker. + // The `storage_space` value is related to the `broker_num` value. + // After broker_num changes, we need to obtain the latest storage_space and compare it with the current storage_space. if d.HasChanges("storage_space") { + resp, err := instances.Get(client, d.Id()).Extract() + if err != nil { + return fmt.Errorf("error getting Kafka instance: %s", err) + } + + newStorageSpace := d.Get("storage_space").(int) + if resp.TotalStorageSpace >= newStorageSpace { + log.Printf("[WARN] The new storage space is less than or equal to the current storage space, no need to resize") + return nil + } + if err = resizeKafkaInstanceStorage(ctx, d, client); err != nil { return err }