diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index 66d9b628fa3f2..e007f53054d3b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.List; +import java.util.Locale; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -57,6 +58,7 @@ import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable; +import static org.apache.hadoop.io.Sizes.S_0; /**************************************************************** * Abstract Checksumed FileSystem. @@ -101,6 +103,14 @@ public void setVerifyChecksum(boolean verifyChecksum) { this.verifyChecksum = verifyChecksum; } + /** + * Is checksum verification enabled? + * @return true if files are to be verified through checksums. + */ + public boolean getVerifyChecksum() { + return verifyChecksum; + } + @Override public void setWriteChecksum(boolean writeChecksum) { this.writeChecksum = writeChecksum; @@ -165,7 +175,8 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) { /******************************************************* * For open()'s FSInputStream - * It verifies that data matches checksums. + * It verifies that data matches checksums iff the data + * file has matching checksums. *******************************************************/ private static class ChecksumFSInputChecker extends FSInputChecker implements IOStatisticsSource, StreamCapabilities { @@ -426,8 +437,18 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes, return data; } + /** + * Turn off range merging to make buffer recycling more likely (but not guaranteed). + * @return 0, always + */ + @Override + public int maxReadSizeForVectorReads() { + return S_0; + } + /** * Vectored read. + *
* If the file has no checksums: delegate to the underlying stream.
* If the file is checksummed: calculate the checksum ranges as
* well as the data ranges, read both, and validate the checksums
@@ -448,10 +469,12 @@ public void readVectored(final List extends FileRange> ranges,
final Consumer
+ * Vector reading is delegated whenever there are no checksums for
+ * the data file, or when validating checksums has been delegated.
+ * @return true if vector reads are to be directly handled by
+ * the data stream.
+ */
+ private boolean dataStreamToHandleVectorIO() {
+ return sums == null;
+ }
+
+ /**
+ * For this stream, declare that range merging may take place;
+ * otherwise delegate to the inner stream.
+ * @param capability string to query the stream support for.
+ * @return true for sliced vector IO if checksum validation
+ * is taking place. False if no checksums are present for the validation.
+ * For all other probes: pass to the wrapped stream
+ */
@Override
public boolean hasCapability(String capability) {
- return datas.hasCapability(capability);
+ switch (capability.toLowerCase(Locale.ENGLISH)) {
+ // slicing can take place during coalescing and checksumming
+ case StreamCapabilities.VECTOREDIO_BUFFERS_SLICED:
+ return !dataStreamToHandleVectorIO();
+ default:
+ return datas.hasCapability(capability);
+ }
}
}
@@ -1142,6 +1193,9 @@ public boolean hasPathCapability(final Path path, final String capability)
case CommonPathCapabilities.FS_APPEND:
case CommonPathCapabilities.FS_CONCAT:
return false;
+ case StreamCapabilities.VECTOREDIO_BUFFERS_SLICED:
+ return getVerifyChecksum();
+
default:
return super.hasPathCapability(p, capability);
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 898afdfa86eac..f58331baa81a1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -510,4 +510,10 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final String HADOOP_SECURITY_RESOLVER_IMPL =
"hadoop.security.resolver.impl";
+ /**
+ * Verify checksums on read -default is true.
+ *
+ * {@value}.
+ */
+ public static final String LOCAL_FS_VERIFY_CHECKSUM = "fs.file.checksum.verify";
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java
index 590cbd9a49ece..e912d2245bed4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java
@@ -27,6 +27,8 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.LOCAL_FS_VERIFY_CHECKSUM;
+
/****************************************************************
* Implement the FileSystem API for the checksumed local filesystem.
*
@@ -50,6 +52,10 @@ public void initialize(URI name, Configuration conf) throws IOException {
if (!scheme.equals(fs.getUri().getScheme())) {
swapScheme = scheme;
}
+ final boolean checksum = conf.getBoolean(LOCAL_FS_VERIFY_CHECKSUM, true);
+ setVerifyChecksum(checksum);
+ LOG.debug("Checksum verification enabled={}", checksum);
+
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
index d5f545b460d7e..3bd93a4f459c3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
@@ -72,6 +72,7 @@
import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;
+import static org.apache.hadoop.fs.VectoredReadUtils.hasVectorIOCapability;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList;
import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
@@ -293,15 +294,12 @@ public FileDescriptor getFileDescriptor() throws IOException {
@Override
public boolean hasCapability(String capability) {
- // a bit inefficient, but intended to make it easier to add
- // new capabilities.
switch (capability.toLowerCase(Locale.ENGLISH)) {
case StreamCapabilities.IOSTATISTICS:
case StreamCapabilities.IOSTATISTICS_CONTEXT:
- case StreamCapabilities.VECTOREDIO:
return true;
default:
- return false;
+ return hasVectorIOCapability(capability);
}
}
@@ -400,7 +398,9 @@ private void initiateRead() {
for(int i = 0; i < ranges.size(); ++i) {
FileRange range = ranges.get(i);
buffers[i] = allocateRelease.getBuffer(false, range.getLength());
- channel.read(buffers[i], range.getOffset(), i, this);
+ final ByteBuffer buffer = buffers[i];
+ LOG.debug("Reading file range {} into buffer {}", range, System.identityHashCode(buffer));
+ channel.read(buffer, range.getOffset(), i, this);
}
}
@@ -416,6 +416,8 @@ private void initiateRead() {
public void completed(Integer result, Integer rangeIndex) {
FileRange range = ranges.get(rangeIndex);
ByteBuffer buffer = buffers[rangeIndex];
+ LOG.debug("Completed read range {} into buffer {} outcome={} remaining={}",
+ range, System.identityHashCode(buffer), result, buffer.remaining());
if (result == -1) {
// no data was read back.
failed(new EOFException("Read past End of File"), rangeIndex);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
index 93ed57ef83057..955040d91a36e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
@@ -86,6 +86,16 @@ public interface StreamCapabilities {
*/
String VECTOREDIO = "in:readvectored";
+ /**
+ * Probe for vector IO implementation details: {@value}.
+ * When performing vectored IO operations, are the buffers returned by readVectored()
+ * potentially sliced subsets of buffers allocated by the allocate() function
+ * passed in the read requests?
+ * If true, this means that the returned buffers may be sliced subsets of the
+ * allocated buffers.
+ */
+ String VECTOREDIO_BUFFERS_SLICED = "fs.capability.vectoredio.sliced";
+
/**
* Stream abort() capability implemented by {@link Abortable#abort()}.
* This matches the Path Capability
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java
index 6adcba39a3fa5..a4c2d69b63bc9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java
@@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
+import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
@@ -476,6 +477,22 @@ public static ByteBuffer sliceTo(ByteBuffer readData, long readOffset,
return readData;
}
+ /**
+ * Default vector IO probes.
+ * These are capabilities which streams that leave vector IO
+ * to the default methods should return when queried for vector capabilities.
+ * @param capability capability to probe for.
+ * @return true if the given capability holds for vectored IO features.
+ */
+ public static boolean hasVectorIOCapability(String capability) {
+ switch (capability.toLowerCase(Locale.ENGLISH)) {
+ case StreamCapabilities.VECTOREDIO:
+ return true;
+ default:
+ return false;
+ }
+ }
+
/**
* private constructor.
*/
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java
new file mode 100644
index 0000000000000..0abe678ab745c
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.nio.ByteBuffer;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.io.ByteBufferPool;
+
+import static java.lang.System.identityHashCode;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A wrapper {@link ByteBufferPool} implementation that tracks whether all allocated buffers
+ * are released.
+ *
+ * It throws the related exception at {@link #close()} if any buffer remains un-released.
+ * It also clears the buffers at release so if they continued being used it'll generate errors.
+ *
+ * To be used for testing..
+ *
+ * The stacktraces of the allocation are not stored by default because
+ * it can significantly decrease the unit test performance.
+ * Configuring this class to log at DEBUG will trigger their collection.
+ * @see ByteBufferAllocationStacktraceException
+ *
+ * Adapted from Parquet class {@code org.apache.parquet.bytes.TrackingByteBufferAllocator}.
+ */
+@VisibleForTesting
+public final class TrackingByteBufferPool implements ByteBufferPool, AutoCloseable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TrackingByteBufferPool.class);
+
+ /**
+ * Wrap an existing allocator with this tracking allocator.
+ * @param allocator allocator to wrap.
+ * @return a new allocator.
+ */
+ public static TrackingByteBufferPool wrap(ByteBufferPool allocator) {
+ return new TrackingByteBufferPool(allocator);
+ }
+
+ public static class LeakDetectorHeapByteBufferPoolException
+ extends RuntimeException {
+
+ private LeakDetectorHeapByteBufferPoolException(String msg) {
+ super(msg);
+ }
+
+ private LeakDetectorHeapByteBufferPoolException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+ private LeakDetectorHeapByteBufferPoolException(
+ String message,
+ Throwable cause,
+ boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+ }
+
+ /**
+ * Strack trace of allocation as saved in the tracking map.
+ */
+ public static final class ByteBufferAllocationStacktraceException
+ extends LeakDetectorHeapByteBufferPoolException {
+
+ /**
+ * Single stack trace instance to use when DEBUG is not enabled.
+ */
+ private static final ByteBufferAllocationStacktraceException WITHOUT_STACKTRACE =
+ new ByteBufferAllocationStacktraceException(false);
+
+ /**
+ * Create a stack trace for the map, either using the shared static one
+ * or a dynamically created one.
+ * @return a stack
+ */
+ private static ByteBufferAllocationStacktraceException create() {
+ return LOG.isDebugEnabled()
+ ? new ByteBufferAllocationStacktraceException()
+ : WITHOUT_STACKTRACE;
+ }
+
+ private ByteBufferAllocationStacktraceException() {
+ super("Allocation stacktrace of the first ByteBuffer:");
+ }
+
+ /**
+ * Private constructor to for the singleton {@link #WITHOUT_STACKTRACE},
+ * telling develoers how to see a trace per buffer.
+ */
+ private ByteBufferAllocationStacktraceException(boolean unused) {
+ super("Log org.apache.hadoop.fs.impl.TrackingByteBufferPool at DEBUG for stack traces",
+ null,
+ false,
+ false);
+ }
+ }
+
+ /**
+ * Exception raised in {@link TrackingByteBufferPool#putBuffer(ByteBuffer)} if the
+ * buffer to release was not in the hash map.
+ */
+ public static final class ReleasingUnallocatedByteBufferException
+ extends LeakDetectorHeapByteBufferPoolException {
+
+ private ReleasingUnallocatedByteBufferException(final ByteBuffer b) {
+ super(String.format("Releasing a ByteBuffer instance that is not allocated"
+ + " by this buffer pool or already been released: %s size %d; hash code %s",
+ b, b.capacity(), identityHashCode(b)));
+ }
+ }
+
+ /**
+ * Exception raised in {@link TrackingByteBufferPool#close()} if there
+ * was an unreleased buffer.
+ */
+ public static final class LeakedByteBufferException
+ extends LeakDetectorHeapByteBufferPoolException {
+
+ private final int count;
+
+ private LeakedByteBufferException(int count, ByteBufferAllocationStacktraceException e) {
+ super(count + " ByteBuffer object(s) is/are remained unreleased"
+ + " after closing this buffer pool.", e);
+ this.count = count;
+ }
+
+ /**
+ * Get the number of unreleased buffers.
+ * @return number of unreleased buffers
+ */
+ public int getCount() {
+ return count;
+ }
+ }
+
+ /**
+ * Tracker of allocations.
+ *
+ * The key maps by the object id of the buffer, and refers to either a common stack trace
+ * or one dynamically created for each allocation.
+ */
+ private final Map
+ * This is incremented in {@link #getBuffer(boolean, int)}.
+ */
+ private final AtomicInteger bufferAllocations = new AtomicInteger();
+
+ /**
+ * Number of buffer releases.
+ *
+ * This is incremented in {@link #putBuffer(ByteBuffer)}.
+ */
+ private final AtomicInteger bufferReleases = new AtomicInteger();
+
+ /**
+ * private constructor.
+ * @param allocator pool allocator.
+ */
+ private TrackingByteBufferPool(ByteBufferPool allocator) {
+ this.allocator = allocator;
+ }
+
+ public int getBufferAllocations() {
+ return bufferAllocations.get();
+ }
+
+ public int getBufferReleases() {
+ return bufferReleases.get();
+ }
+
+ /**
+ * Get a buffer from the pool.
+ *
+ * This increments the {@link #bufferAllocations} counter and stores the
+ * singleron or local allocation stack trace in the {@link #allocated} map.
+ * @param direct whether to allocate a direct buffer or not
+ * @param size size of the buffer to allocate
+ * @return a ByteBuffer instance
+ */
+ @Override
+ public synchronized ByteBuffer getBuffer(final boolean direct, final int size) {
+ bufferAllocations.incrementAndGet();
+ ByteBuffer buffer = allocator.getBuffer(direct, size);
+ final ByteBufferAllocationStacktraceException ex =
+ ByteBufferAllocationStacktraceException.create();
+ allocated.put(buffer, ex);
+ LOG.debug("Creating ByteBuffer:{} size {} {}",
+ identityHashCode(buffer), size, buffer, ex);
+ return buffer;
+ }
+
+ /**
+ * Release a buffer back to the pool.
+ *
+ * This increments the {@link #bufferReleases} counter and removes the
+ * buffer from the {@link #allocated} map.
+ *
+ * If the buffer was not allocated by this pool, it throws
+ * {@link ReleasingUnallocatedByteBufferException}.
+ *
+ * @param buffer buffer to release
+ * @throws ReleasingUnallocatedByteBufferException if the buffer was not allocated by this pool
+ */
+ @Override
+ public synchronized void putBuffer(ByteBuffer buffer)
+ throws ReleasingUnallocatedByteBufferException {
+
+ bufferReleases.incrementAndGet();
+ requireNonNull(buffer);
+ LOG.debug("Releasing ByteBuffer: {}: {}", identityHashCode(buffer), buffer);
+ if (allocated.remove(buffer) == null) {
+ throw new ReleasingUnallocatedByteBufferException(buffer);
+ }
+ allocator.putBuffer(buffer);
+ // Clearing the buffer so subsequent access would probably generate errors
+ buffer.clear();
+ }
+
+ /**
+ * Check if the buffer is in the pool.
+ * @param buffer buffer
+ * @return true if the buffer is in the pool
+ */
+ public boolean containsBuffer(ByteBuffer buffer) {
+ return allocated.containsKey(requireNonNull(buffer));
+ }
+
+ /**
+ * Get the number of allocated buffers.
+ * @return number of allocated buffers
+ */
+ public int size() {
+ return allocated.size();
+ }
+
+ /**
+ * Expect all buffers to be released -if not, log unreleased ones
+ * and then raise an exception with the stack trace of the first
+ * unreleased buffer.
+ * @throws LeakedByteBufferException if at least one buffer was not released
+ */
+ @Override
+ public void close() throws LeakedByteBufferException {
+ if (!allocated.isEmpty()) {
+ allocated.keySet().forEach(buffer ->
+ LOG.warn("Unreleased ByteBuffer {}; {}", identityHashCode(buffer), buffer));
+ LeakedByteBufferException ex = new LeakedByteBufferException(
+ allocated.size(),
+ allocated.values().iterator().next());
+ allocated.clear(); // Drop the references to the ByteBuffers, so they can be gc'd
+ throw ex;
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Sizes.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Sizes.java
index bf2dc78741f51..7bfce520910c5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Sizes.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Sizes.java
@@ -31,6 +31,9 @@
@InterfaceStability.Evolving
public final class Sizes {
+ /** 0 bytes: {@value}. Here to make it easy to find use of zero in constants. */
+ public static final int S_0 = 0;
+
/** 2^8 bytes: {@value}. */
public static final int S_256 = 256;
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 08197a8377e21..a0e7e9520ba1a 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1403,6 +1403,24 @@
* This is a complex suite as it really is testing the store, so measurements of
* what IO took place is also performed if the input stream is suitable for this.
*/
@@ -96,15 +96,12 @@ protected AbstractFSContract createContract(Configuration conf) {
}
/**
- * Analytics Accelerator Library for Amazon S3 does not support Vectored Reads.
- * @throws Exception
+ * Create a configuration.
+ * @return a configuration
*/
- @BeforeEach
@Override
- public void setup() throws Exception {
- super.setup();
- skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
- "Analytics Accelerator does not support vectored reads");
+ protected Configuration createConfiguration() {
+ return disableAnalyticsAccelerator(super.createConfiguration());
}
/**
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 17159b901e256..d244e5004841e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -108,6 +108,7 @@
import static org.apache.hadoop.fs.impl.FlagSet.createFlagSet;
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_S3;
import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Analytics;
+import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Classic;
import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Prefetch;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
@@ -1882,6 +1883,18 @@ public static Configuration enableAnalyticsAccelerator(Configuration conf) {
return conf;
}
+ /**
+ * Disable analytics stream for S3A S3AFileSystem in tests.
+ * @param conf Configuration to update
+ * @return patched config
+ */
+ public static Configuration disableAnalyticsAccelerator(Configuration conf) {
+ removeBaseAndBucketOverrides(conf,
+ INPUT_STREAM_TYPE);
+ conf.setEnum(INPUT_STREAM_TYPE, Classic);
+ return conf;
+ }
+
/**
* Probe for a filesystem having a specific stream type;
* this is done through filesystem capabilities.