diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index 6b910c6538070..424f5b1bede30 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -21,9 +21,16 @@ import java.io.EOFException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.IntFunction; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; +import software.amazon.s3.analyticsaccelerator.common.ObjectRange; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; import software.amazon.s3.analyticsaccelerator.util.InputPolicy; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; @@ -37,6 +44,11 @@ import org.apache.hadoop.fs.s3a.Retries; import org.apache.hadoop.fs.s3a.S3AInputPolicy; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.VectoredReadUtils; + +import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED; + /** * Analytics stream creates a stream using aws-analytics-accelerator-s3. This stream supports @@ -128,6 +140,42 @@ public int read(byte[] buf, int off, int len) throws IOException { return bytesRead; } + /** + * Pass to {@link #readVectored(List, IntFunction, Consumer)} + * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser. + * {@inheritDoc} + */ + @Override + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + /** + * Pass to {@link #readVectored(List, IntFunction, Consumer)} + * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser. + * {@inheritDoc} + */ + @Override + public void readVectored(final List ranges, + final IntFunction allocate, + final Consumer release) throws IOException { + LOG.debug("AAL: Starting vectored read on path {} for ranges {} ", getPathStr(), ranges); + throwIfClosed(); + + List objectRanges = new ArrayList<>(); + + for (FileRange range : ranges) { + CompletableFuture result = new CompletableFuture<>(); + ObjectRange objectRange = new ObjectRange(result, range.getOffset(), range.getLength()); + objectRanges.add(objectRange); + range.setData(result); + } + + // AAL does not do any range coalescing, so input and combined ranges are the same. + this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(), ranges.size()); + inputStream.readVectored(objectRanges, allocate, release); + } @Override public boolean seekToNewSource(long l) throws IOException { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java index 351b263e56fb7..23debd564160c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java @@ -19,15 +19,28 @@ package org.apache.hadoop.fs.contract.s3a; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.StreamStatisticNames; +import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.test.tags.IntegrationTest; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedClass; import org.junit.jupiter.params.provider.MethodSource; +import java.util.List; +import java.util.function.Consumer; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; +import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipForAnyEncryptionExceptSSES3; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; /** * S3A contract tests for vectored reads with the Analytics stream. @@ -64,7 +77,6 @@ protected Configuration createConfiguration() { // This issue is tracked in: // https://github.com/awslabs/analytics-accelerator-s3/issues/218 skipForAnyEncryptionExceptSSES3(conf); - conf.set("fs.contract.vector-io-early-eof-check", "false"); return conf; } @@ -72,4 +84,41 @@ protected Configuration createConfiguration() { protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); } + + /** + * When the offset is negative, AAL returns IllegalArgumentException, whereas the base implementation will return + * an EoF. + */ + @Override + public void testNegativeOffsetRange() throws Exception { + verifyExceptionalVectoredRead(ContractTestUtils.range(-1, 50), IllegalArgumentException.class); + } + + /** + * Currently there is no null check on the release operation, this will be fixed in the next AAL version. + */ + @Override + public void testNullReleaseOperation() { + skip("AAL current does not do a null check on the release operation"); + } + + @Test + public void testReadVectoredWithAALStatsCollection() throws Exception { + + List fileRanges = createSampleNonOverlappingRanges(); + try (FSDataInputStream in = openVectorFile()) { + in.readVectored(fileRanges, getAllocate()); + + validateVectoredReadResult(fileRanges, DATASET, 0); + IOStatistics st = in.getIOStatistics(); + + // Statistics such as GET requests will be added after IoStats support. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED, 1); + + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, + 1); + } + } }