From fdb9625c4ecd323bf4d2a8485a6665618baf8cd1 Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Wed, 11 Jun 2025 16:03:13 +0100 Subject: [PATCH 1/2] integrates with AAL's readVectored() --- .../fs/s3a/impl/streams/AnalyticsStream.java | 55 +++++++++++++++++++ ...3AContractAnalyticsStreamVectoredRead.java | 33 +++++++++++ 2 files changed, 88 insertions(+) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index 6b910c6538070..a91558f075d60 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -21,9 +21,19 @@ import java.io.EOFException; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.IntFunction; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.VectoredReadUtils; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; +import software.amazon.s3.analyticsaccelerator.common.ObjectRange; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; import software.amazon.s3.analyticsaccelerator.util.InputPolicy; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; @@ -38,6 +48,9 @@ import org.apache.hadoop.fs.s3a.S3AInputPolicy; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; +import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED; + + /** * Analytics stream creates a stream using aws-analytics-accelerator-s3. This stream supports * parquet specific optimisations such as parquet-aware prefetching. For more details, see @@ -128,6 +141,48 @@ public int read(byte[] buf, int off, int len) throws IOException { return bytesRead; } + /** + * {@inheritDoc} + * Pass to {@link #readVectored(List, IntFunction, Consumer)} + * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser. + * @param ranges the byte ranges to read. + * @param allocate the function to allocate ByteBuffer. + * @throws IOException IOE if any. + */ + @Override + public void readVectored(List ranges, + IntFunction allocate) throws IOException { + readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED); + } + + /** + * {@inheritDoc} + * Pass to {@link #readVectored(List, IntFunction, Consumer)} + * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser. + * @param ranges the byte ranges to read. + * @param allocate the function to allocate ByteBuffer. + * @throws IOException IOE if any. + */ + @Override + public void readVectored(final List ranges, + final IntFunction allocate, + final Consumer release) throws IOException { + LOG.debug("AAL: Starting vectored read on path {} for ranges {} ", getPathStr(), ranges); + throwIfClosed(); + + List objectRanges = new ArrayList<>(); + + for (FileRange range : ranges) { + CompletableFuture result = new CompletableFuture<>(); + ObjectRange objectRange = new ObjectRange(result, range.getOffset(), range.getLength()); + objectRanges.add(objectRange); + range.setData(result); + } + + // AAL does not do any range coalescing, so input and combined ranges are the same. + this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(), ranges.size()); + inputStream.readVectored(objectRanges, allocate, release); + } @Override public boolean seekToNewSource(long l) throws IOException { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java index 351b263e56fb7..ed2b9c16f222c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java @@ -19,15 +19,24 @@ package org.apache.hadoop.fs.contract.s3a; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.test.tags.IntegrationTest; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedClass; import org.junit.jupiter.params.provider.MethodSource; +import java.util.List; + import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipForAnyEncryptionExceptSSES3; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; /** * S3A contract tests for vectored reads with the Analytics stream. @@ -72,4 +81,28 @@ protected Configuration createConfiguration() { protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); } + + @Override + public void testNegativeOffsetRange() throws Exception { + verifyExceptionalVectoredRead(ContractTestUtils.range(-1, 50), IllegalArgumentException.class); + } + + @Test + public void testReadVectoredWithAALStatsCollection() throws Exception { + + List fileRanges = createSampleNonOverlappingRanges(); + try (FSDataInputStream in = openVectorFile()){ + in.readVectored(fileRanges, getAllocate()); + + IOStatistics st = in.getIOStatistics(); + + // Statistics such as GET requests will be added after IoStats support. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED, 1); + + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, + 1); + } + } } From f334768a2612a2a897d6bcb8157331b40d82ea62 Mon Sep 17 00:00:00 2001 From: Ahmar Suhail Date: Wed, 30 Jul 2025 14:49:17 +0100 Subject: [PATCH 2/2] review comments --- .../fs/s3a/impl/streams/AnalyticsStream.java | 15 ++++---------- ...3AContractAnalyticsStreamVectoredRead.java | 20 +++++++++++++++++-- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index a91558f075d60..424f5b1bede30 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -24,13 +24,10 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.IntFunction; -import org.apache.hadoop.fs.FileRange; -import org.apache.hadoop.fs.VectoredReadUtils; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; import software.amazon.s3.analyticsaccelerator.common.ObjectRange; @@ -47,6 +44,8 @@ import org.apache.hadoop.fs.s3a.Retries; import org.apache.hadoop.fs.s3a.S3AInputPolicy; import org.apache.hadoop.fs.s3a.S3ObjectAttributes; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.VectoredReadUtils; import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED; @@ -142,12 +141,9 @@ public int read(byte[] buf, int off, int len) throws IOException { } /** - * {@inheritDoc} * Pass to {@link #readVectored(List, IntFunction, Consumer)} * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser. - * @param ranges the byte ranges to read. - * @param allocate the function to allocate ByteBuffer. - * @throws IOException IOE if any. + * {@inheritDoc} */ @Override public void readVectored(List ranges, @@ -156,12 +152,9 @@ public void readVectored(List ranges, } /** - * {@inheritDoc} * Pass to {@link #readVectored(List, IntFunction, Consumer)} * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser. - * @param ranges the byte ranges to read. - * @param allocate the function to allocate ByteBuffer. - * @throws IOException IOE if any. + * {@inheritDoc} */ @Override public void readVectored(final List ranges, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java index ed2b9c16f222c..23debd564160c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.StreamStatisticNames; +import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.test.tags.IntegrationTest; import org.junit.jupiter.api.Test; @@ -33,7 +34,10 @@ import org.junit.jupiter.params.provider.MethodSource; import java.util.List; +import java.util.function.Consumer; +import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; +import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipForAnyEncryptionExceptSSES3; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; @@ -73,7 +77,6 @@ protected Configuration createConfiguration() { // This issue is tracked in: // https://github.com/awslabs/analytics-accelerator-s3/issues/218 skipForAnyEncryptionExceptSSES3(conf); - conf.set("fs.contract.vector-io-early-eof-check", "false"); return conf; } @@ -82,18 +85,31 @@ protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); } + /** + * When the offset is negative, AAL returns IllegalArgumentException, whereas the base implementation will return + * an EoF. + */ @Override public void testNegativeOffsetRange() throws Exception { verifyExceptionalVectoredRead(ContractTestUtils.range(-1, 50), IllegalArgumentException.class); } + /** + * Currently there is no null check on the release operation, this will be fixed in the next AAL version. + */ + @Override + public void testNullReleaseOperation() { + skip("AAL current does not do a null check on the release operation"); + } + @Test public void testReadVectoredWithAALStatsCollection() throws Exception { List fileRanges = createSampleNonOverlappingRanges(); - try (FSDataInputStream in = openVectorFile()){ + try (FSDataInputStream in = openVectorFile()) { in.readVectored(fileRanges, getAllocate()); + validateVectoredReadResult(fileRanges, DATASET, 0); IOStatistics st = in.getIOStatistics(); // Statistics such as GET requests will be added after IoStats support.