diff --git a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java index a6767f08..5dbc6e38 100644 --- a/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java +++ b/spark-doris-connector/src/main/java/org/apache/doris/spark/cfg/ConfigurationOptions.java @@ -90,6 +90,10 @@ public interface ConfigurationOptions { int DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT = 50; + String DORIS_SINK_RETRY_INTERVAL_MS = "doris.sink.retry.interval.ms"; + + int DORIS_SINK_RETRY_INTERVAL_MS_DEFAULT = 50; + /** * set types to ignore, split by comma * e.g. diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala index 54976a7d..4d70e1e5 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/sql/Utils.scala @@ -176,7 +176,6 @@ private[spark] object Utils { val result = Try(f) result match { case Success(result) => - LockSupport.parkNanos(interval.toNanos) Success(result) case Failure(exception: T) if retryTimes > 0 => logger.warn(s"Execution failed caused by: ", exception) diff --git a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala index 3fdfb793..21453b4b 100644 --- a/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala +++ b/spark-doris-connector/src/main/scala/org/apache/doris/spark/writer/DorisWriter.scala @@ -31,6 +31,8 @@ import java.io.IOException import java.time.Duration import java.util import java.util.Objects +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.LockSupport import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.{Failure, Success} @@ -46,8 +48,10 @@ class DorisWriter(settings: SparkSettings) extends Serializable { private val sinkTaskPartitionSize: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_TASK_PARTITION_SIZE) private val sinkTaskUseRepartition: Boolean = settings.getProperty(ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION, ConfigurationOptions.DORIS_SINK_TASK_USE_REPARTITION_DEFAULT.toString).toBoolean - private val batchInterValMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, + private val batchIntervalMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS, ConfigurationOptions.DORIS_SINK_BATCH_INTERVAL_MS_DEFAULT) + private val retryIntervalMs: Integer = settings.getIntegerProperty(ConfigurationOptions.DORIS_SINK_RETRY_INTERVAL_MS, + ConfigurationOptions.DORIS_SINK_RETRY_INTERVAL_MS_DEFAULT) private val enable2PC: Boolean = settings.getBooleanProperty(ConfigurationOptions.DORIS_SINK_ENABLE_2PC, ConfigurationOptions.DORIS_SINK_ENABLE_2PC_DEFAULT); @@ -84,10 +88,12 @@ class DorisWriter(settings: SparkSettings) extends Serializable { resultRdd.foreachPartition(iterator => { while (iterator.hasNext) { // do load batch with retries - Utils.retry[Int, Exception](maxRetryTimes, Duration.ofMillis(batchInterValMs.toLong), logger) { + Utils.retry[Int, Exception](maxRetryTimes, Duration.ofMillis(retryIntervalMs.toLong), logger) { loadFunc(iterator.asJava, schema) } match { - case Success(txnId) => if (enable2PC) handleLoadSuccess(txnId, preCommittedTxnAcc) + case Success(txnId) => + if (enable2PC) handleLoadSuccess(txnId, preCommittedTxnAcc) + LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(batchIntervalMs.toLong)) case Failure(e) => if (enable2PC) handleLoadFailure(preCommittedTxnAcc) throw new IOException(