diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 09a2f1549a742..816f09be6d1bb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -363,6 +363,9 @@ public void close() throws IOException { if (abfsApacheHttpClient != null) { abfsApacheHttpClient.close(); } + if (intercept != null) { + IOUtils.cleanupWithLogger(LOG, intercept); + } if (tokenProvider instanceof Closeable) { IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java index f1eb3a2a77476..38878f8b5efd0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java @@ -18,6 +18,9 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.Closeable; +import java.io.IOException; + import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicBoolean; @@ -34,7 +37,7 @@ import static org.apache.hadoop.util.Time.now; -class AbfsClientThrottlingAnalyzer { +class AbfsClientThrottlingAnalyzer implements Closeable { private static final Logger LOG = LoggerFactory.getLogger( AbfsClientThrottlingAnalyzer.class); private static final int MIN_ANALYSIS_PERIOD_MS = 1000; @@ -172,6 +175,22 @@ public boolean suspendIfNecessary() { return false; } + /** + * Closes the throttling analyzer and releases associated resources. + * This method cancels the internal timer and cleans up any pending timer tasks. + * It is safe to call this method multiple times. + * + * @throws IOException if an I/O error occurs during cleanup + */ +@Override +public void close() throws IOException { + if (timer != null) { + timer.cancel(); + timer.purge(); + timer = null; + } +} + @VisibleForTesting int getSleepDuration() { return sleepDuration; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java index 39aaf34db0d57..1afb642d798f5 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.IOException; import java.net.HttpURLConnection; import java.util.concurrent.locks.ReentrantLock; @@ -223,4 +224,18 @@ private static long getContentLengthIfKnown(String range) { } return contentLength; } + + /** + * Closes the throttling intercept and releases associated resources. + * This method closes both the read and write throttling analyzers. + */ + @Override + public void close() throws IOException { + if (readThrottler != null) { + readThrottler.close(); + } + if (writeThrottler != null) { + writeThrottler.close(); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java index 58e50592997dc..92578021a9584 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.IOException; + /** * Implementation of {@link AbfsThrottlingIntercept} that does not throttle * the ABFS process. @@ -40,4 +42,12 @@ public void updateMetrics(final AbfsRestOperationType operationType, public void sendingRequest(final AbfsRestOperationType operationType, final AbfsCounters abfsCounters) { } + +/** + * No-op implementation of close method. + */ + @Override + public void close() throws IOException { + // No resources to clean up in no-op implementation + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java index 725377714642b..6ce1979df9323 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java @@ -26,7 +26,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Unstable -public interface AbfsThrottlingIntercept { +public interface AbfsThrottlingIntercept extends Closable { /** * Updates the metrics for successful and failed read and write operations. @@ -47,4 +47,11 @@ void updateMetrics(AbfsRestOperationType operationType, void sendingRequest(AbfsRestOperationType operationType, AbfsCounters abfsCounters); + /** + * Closes the throttling intercept and releases associated resources. + * @throws IOException if an I/O error occurs during cleanup + */ + @Override + void close() throws IOException; + }