@@ -658,33 +658,45 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isCont
658658
659659 /* test */ void dynamicUpdateKafkaConfig (NodeRef nodeRef , Admin ac , KafkaConfigurationDiff configurationDiff )
660660 throws ForceableProblem , InterruptedException {
661- Map <ConfigResource , Collection <AlterConfigOp >> updatedPerBrokerConfig = new HashMap <>(2 );
662- Map <ConfigResource , Collection <AlterConfigOp >> updatedClusterWideConfig = new HashMap <>(1 );
663- var podId = nodeRef .nodeId ();
664- updatedPerBrokerConfig .put (getBrokersConfig (podId ), configurationDiff .getConfigDiff (Scope .PER_BROKER ));
665- updatedClusterWideConfig .put (getClusterWideConfig (), configurationDiff .getConfigDiff (Scope .CLUSTER_WIDE ));
666-
667- LOGGER .traceCr (reconciliation , "Updating cluster wide configuration with {}" , updatedClusterWideConfig );
668- LOGGER .debugCr (reconciliation , "Updating broker configuration {}" , nodeRef );
669- LOGGER .traceCr (reconciliation , "Updating broker configuration {} with {}" , nodeRef , updatedPerBrokerConfig );
670-
671- AlterConfigsResult alterClusterConfigResult = ac .incrementalAlterConfigs (updatedClusterWideConfig );
672- KafkaFuture <Void > clusterConfigFuture = alterClusterConfigResult .values ().get (getClusterWideConfig ());
673-
674- AlterConfigsResult alterBrokerConfigResult = ac .incrementalAlterConfigs (updatedPerBrokerConfig );
675- KafkaFuture <Void > brokerConfigFuture = alterBrokerConfigResult .values ().get (getBrokersConfig (podId ));
676-
677- await (VertxUtil .kafkaFutureToVertxFuture (reconciliation , vertx , clusterConfigFuture ), 30 , TimeUnit .SECONDS ,
678- error -> {
679- LOGGER .errorCr (reconciliation , "Error updating cluster-wide configuration" , error );
680- return new ForceableProblem ("Error updating cluster-wide configuration" , error );
681- });
682- await (VertxUtil .kafkaFutureToVertxFuture (reconciliation , vertx , brokerConfigFuture ), 30 , TimeUnit .SECONDS ,
683- error -> {
684- LOGGER .errorCr (reconciliation , "Error updating Kafka configuration for pod {}" , nodeRef , error );
685- return new ForceableProblem ("Error updating Kafka configuration for pod " + nodeRef , error );
686- });
687- LOGGER .infoCr (reconciliation , "Dynamic update of pod {} was successful." , nodeRef );
661+ Collection <AlterConfigOp > perBrokerDiff = configurationDiff .getConfigDiff (Scope .PER_BROKER );
662+ Collection <AlterConfigOp > clusterWideDiff = configurationDiff .getConfigDiff (Scope .CLUSTER_WIDE );
663+
664+ if (!perBrokerDiff .isEmpty ()) {
665+ Map <ConfigResource , Collection <AlterConfigOp >> updatedPerBrokerConfig = new HashMap <>(2 );
666+ updatedPerBrokerConfig .put (getBrokersConfig (nodeRef .nodeId ()), perBrokerDiff );
667+ LOGGER .debugCr (reconciliation , "Updating broker configuration {}" , nodeRef );
668+ LOGGER .traceCr (reconciliation , "Updating broker configuration {} with {}" , nodeRef , updatedPerBrokerConfig );
669+
670+ AlterConfigsResult alterBrokerConfigResult = ac .incrementalAlterConfigs (updatedPerBrokerConfig );
671+ KafkaFuture <Void > brokerConfigFuture = alterBrokerConfigResult .values ().get (getBrokersConfig (nodeRef .nodeId ()));
672+
673+ await (VertxUtil .kafkaFutureToVertxFuture (reconciliation , vertx , brokerConfigFuture ), 30 , TimeUnit .SECONDS ,
674+ error -> {
675+ LOGGER .errorCr (reconciliation , "Error updating Kafka configuration for pod {}" , nodeRef , error );
676+ return new ForceableProblem ("Error updating Kafka configuration for pod " + nodeRef , error );
677+ });
678+
679+ LOGGER .infoCr (reconciliation , "Dynamic update of pod {} was successful." , nodeRef );
680+ }
681+
682+ if (!clusterWideDiff .isEmpty ()) {
683+ Map <ConfigResource , Collection <AlterConfigOp >> updatedClusterWideConfig = new HashMap <>(1 );
684+ updatedClusterWideConfig .put (getClusterWideConfig (), clusterWideDiff );
685+
686+ LOGGER .debugCr (reconciliation , "Updating cluster-wide configuration" );
687+ LOGGER .traceCr (reconciliation , "Updating cluster-wide configuration with {}" , updatedClusterWideConfig );
688+
689+ AlterConfigsResult alterClusterConfigResult = ac .incrementalAlterConfigs (updatedClusterWideConfig );
690+ KafkaFuture <Void > clusterConfigFuture = alterClusterConfigResult .values ().get (getClusterWideConfig ());
691+
692+ await (VertxUtil .kafkaFutureToVertxFuture (reconciliation , vertx , clusterConfigFuture ), 30 , TimeUnit .SECONDS ,
693+ error -> {
694+ LOGGER .errorCr (reconciliation , "Error updating cluster-wide configuration" , error );
695+ return new ForceableProblem ("Error updating cluster-wide configuration" , error );
696+ });
697+
698+ LOGGER .infoCr (reconciliation , "Dynamic update of cluster-wide configuration(s)" );
699+ }
688700 }
689701
690702 /** Exceptions which we're prepared to ignore (thus forcing a restart) in some circumstances. */
0 commit comments