Skip to content

Commit 1096112

Browse files
committed
HADOOP-18296. Buffer slicing in checksum fs
Wire up TrackingByteBufferPool to vector read tests, with test suite AbstractContractVectoredReadTest adding test testBufferSlicing() to generate conditions which may trigger slicing. Only files which declare that they may slicing buffers are permitted to return buffers to the pool which aren't known about, or to close the pool with outstanding entries. So: no fix, just public declaration of behavior and test to verify that no other streams are doing it.
1 parent 86d6bac commit 1096112

File tree

4 files changed

+130
-15
lines changed

4 files changed

+130
-15
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public void initialize(URI name, Configuration conf) throws IOException {
5454
}
5555
final boolean checksum = conf.getBoolean(LOCAL_FS_VERIFY_CHECKSUM, true);
5656
setVerifyChecksum(checksum);
57-
LOG.debug("Checksum verification enabled: {}", checksum);
57+
LOG.debug("Checksum verification enabled={}", checksum);
5858

5959
}
6060

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.HashMap;
2323
import java.util.Map;
2424
import java.util.Objects;
25+
import java.util.concurrent.atomic.AtomicInteger;
2526

2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
@@ -135,8 +136,7 @@ private ByteBufferAllocationStacktraceException() {
135136
}
136137

137138
private ByteBufferAllocationStacktraceException(boolean unused) {
138-
super(
139-
"Log org.apache.hadoop.fs.impl.TrackingByteBufferPool at DEBUG for full stack traces",
139+
super("Log org.apache.hadoop.fs.impl.TrackingByteBufferPool at DEBUG for stack traces",
140140
null,
141141
false,
142142
false);
@@ -147,20 +147,31 @@ private ByteBufferAllocationStacktraceException(boolean unused) {
147147
* Exception raised in {@link TrackingByteBufferPool#putBuffer(ByteBuffer)} if the
148148
* buffer to release was not in the hash map.
149149
*/
150-
public static final class ReleasingUnallocatedByteBufferException extends LeakDetectorHeapByteBufferPoolException {
150+
public static final class ReleasingUnallocatedByteBufferException
151+
extends LeakDetectorHeapByteBufferPoolException {
151152

152-
private ReleasingUnallocatedByteBufferException() {
153-
super("Releasing a ByteBuffer instance that is not allocated by this buffer pool or already been released");
153+
private ReleasingUnallocatedByteBufferException(final ByteBuffer b) {
154+
super(String.format("Releasing a ByteBuffer instance that is not allocated"
155+
+ " by this buffer pool or already been released: %s size %d", b, b.capacity()));
154156
}
155157
}
156158

157159
/**
158160
* Exception raised in {@link TrackingByteBufferPool#close()} if there was an unreleased buffer.
159161
*/
160-
public static class LeakedByteBufferException extends LeakDetectorHeapByteBufferPoolException {
162+
public static final class LeakedByteBufferException
163+
extends LeakDetectorHeapByteBufferPoolException {
164+
165+
private final int count;
161166

162167
private LeakedByteBufferException(int count, ByteBufferAllocationStacktraceException e) {
163-
super(count + " ByteBuffer object(s) is/are remained unreleased after closing this buffer pool.", e);
168+
super(count + " ByteBuffer object(s) is/are remained unreleased"
169+
+ " after closing this buffer pool.", e);
170+
this.count = count;
171+
}
172+
173+
public int getCount() {
174+
return count;
164175
}
165176
}
166177

@@ -177,6 +188,9 @@ private LeakedByteBufferException(int count, ByteBufferAllocationStacktraceExcep
177188
*/
178189
private final ByteBufferPool allocator;
179190

191+
private final AtomicInteger bufferAllocations = new AtomicInteger();
192+
private final AtomicInteger bufferReleases = new AtomicInteger();
193+
180194
/**
181195
* private constructor.
182196
* @param allocator pool allocator.
@@ -185,29 +199,61 @@ private TrackingByteBufferPool(ByteBufferPool allocator) {
185199
this.allocator = allocator;
186200
}
187201

202+
public int getBufferAllocations() {
203+
return bufferAllocations.get();
204+
}
205+
206+
public int getBufferReleases() {
207+
return bufferReleases.get();
208+
}
209+
188210
@Override
189-
public ByteBuffer getBuffer(final boolean direct, final int size) {
211+
public synchronized ByteBuffer getBuffer(final boolean direct, final int size) {
212+
bufferAllocations.incrementAndGet();
190213
ByteBuffer buffer = allocator.getBuffer(direct, size);
191-
final ByteBufferAllocationStacktraceException ex = ByteBufferAllocationStacktraceException.create();
214+
final ByteBufferAllocationStacktraceException ex =
215+
ByteBufferAllocationStacktraceException.create();
192216
final Key key = new Key(buffer);
193217
allocated.put(key, ex);
194218
LOG.debug("Creating ByteBuffer:{} size {} {}", key.hashCode(), size, buffer, ex);
195219
return buffer;
196220
}
197221

198222
@Override
199-
public void putBuffer(ByteBuffer b) throws ReleasingUnallocatedByteBufferException {
223+
public synchronized void putBuffer(ByteBuffer b)
224+
throws ReleasingUnallocatedByteBufferException {
225+
226+
bufferReleases.incrementAndGet();
200227
Objects.requireNonNull(b);
201228
final Key key = new Key(b);
202229
LOG.debug("Releasing ByteBuffer: {}: {}", key.hashCode(), b);
203230
if (allocated.remove(key) == null) {
204-
throw new ReleasingUnallocatedByteBufferException();
231+
throw new ReleasingUnallocatedByteBufferException(b);
205232
}
206233
allocator.putBuffer(b);
207234
// Clearing the buffer so subsequent access would probably generate errors
208235
b.clear();
209236
}
210237

238+
/**
239+
* Check if the buffer is in the pool.
240+
* @param b buffer
241+
* @return true if the buffer is in the pool
242+
*/
243+
public boolean containsBuffer(ByteBuffer b) {
244+
Objects.requireNonNull(b);
245+
final Key key = new Key(b);
246+
return allocated.containsKey(key);
247+
}
248+
249+
/**
250+
* Get the number of allocated buffers.
251+
* @return number of allocated buffers
252+
*/
253+
public int size() {
254+
return allocated.size();
255+
}
256+
211257
/**
212258
* Expect all buffers to be released -if not, log unreleased ones
213259
* and then raise an exception with the stack trace of the first

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java

Lines changed: 65 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.hadoop.fs.FileStatus;
4545
import org.apache.hadoop.fs.FileSystem;
4646
import org.apache.hadoop.fs.Path;
47+
import org.apache.hadoop.fs.impl.TrackingByteBufferPool;
4748
import org.apache.hadoop.io.ElasticByteBufferPool;
4849
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
4950
import org.apache.hadoop.util.concurrent.HadoopExecutors;
@@ -52,13 +53,16 @@
5253
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
5354
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
5455
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR;
56+
import static org.apache.hadoop.fs.StreamCapabilities.VECTOREDIO_BUFFERS_SLICED;
5557
import static org.apache.hadoop.fs.contract.ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS;
5658
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
5759
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
5860
import static org.apache.hadoop.fs.contract.ContractTestUtils.range;
5961
import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
6062
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
61-
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
63+
import static org.apache.hadoop.io.Sizes.S_128K;
64+
import static org.apache.hadoop.io.Sizes.S_4K;
65+
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
6266
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
6367
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
6468

@@ -74,7 +78,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
7478
private static final Logger LOG =
7579
LoggerFactory.getLogger(AbstractContractVectoredReadTest.class);
7680

77-
public static final int DATASET_LEN = 64 * 1024;
81+
public static final int DATASET_LEN = S_128K;
7882
protected static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
7983
protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
8084

@@ -91,6 +95,8 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac
9195

9296
private final String bufferType;
9397

98+
private final boolean isDirect;
99+
94100
/**
95101
* Path to the vector file.
96102
*/
@@ -110,7 +116,7 @@ public static List<String> params() {
110116

111117
protected AbstractContractVectoredReadTest(String bufferType) {
112118
this.bufferType = bufferType;
113-
final boolean isDirect = !"array".equals(bufferType);
119+
this.isDirect = !"array".equals(bufferType);
114120
this.allocate = size -> pool.getBuffer(isDirect, size);
115121
}
116122

@@ -619,4 +625,60 @@ protected <T extends Throwable> void verifyExceptionalVectoredRead(
619625
});
620626
}
621627
}
628+
629+
@Test
630+
public void testBufferSlicing() throws Throwable {
631+
describe("Test buffer slicing behavior in vectored IO");
632+
633+
final int numBuffers = 8;
634+
final int bufferSize = S_4K;
635+
long offset = 0;
636+
final List<FileRange> fileRanges = new ArrayList<>();
637+
for (int i = 0; i < numBuffers; i++) {
638+
fileRanges.add(FileRange.createFileRange(offset, bufferSize));
639+
// increment and add a non-binary-aligned gap, so as to force
640+
// offsets to be misaligned with possible page sizes.
641+
offset += bufferSize + 4000;
642+
}
643+
TrackingByteBufferPool pool = TrackingByteBufferPool.wrap(getPool());
644+
int unknownBuffers = 0;
645+
boolean slicing;
646+
try (FSDataInputStream in = openVectorFile()) {
647+
slicing = in.hasCapability(VECTOREDIO_BUFFERS_SLICED);
648+
LOG.info("Slicing is {} for vectored IO with stream {}", slicing, in);
649+
in.readVectored(fileRanges, s -> pool.getBuffer(isDirect, s), pool::putBuffer);
650+
651+
// check that all buffers are from the the pool, unless they are sliced.
652+
for (FileRange res : fileRanges) {
653+
CompletableFuture<ByteBuffer> data = res.getData();
654+
ByteBuffer buffer = awaitFuture(data);
655+
Assertions.assertThat(buffer)
656+
.describedAs("Buffer must not be null")
657+
.isNotNull();
658+
Assertions.assertThat(slicing || pool.containsBuffer(buffer))
659+
.describedAs("Buffer must be from the pool")
660+
.isTrue();
661+
try {
662+
pool.putBuffer(buffer);
663+
} catch (TrackingByteBufferPool.ReleasingUnallocatedByteBufferException e) {
664+
// this can happen if the buffer was sliced, as it is not in the pool.
665+
if (!slicing) {
666+
throw e;
667+
}
668+
LOG.info("Sliced buffer detected: {}", buffer);
669+
unknownBuffers++;
670+
}
671+
}
672+
}
673+
try {
674+
pool.close();
675+
} catch (TrackingByteBufferPool.LeakedByteBufferException e) {
676+
if (!slicing) {
677+
throw e;
678+
}
679+
LOG.info("Slicing is enabled; we saw leaked buffers: {} after {} releases of unknown bufferfs",
680+
e.getCount(), unknownBuffers);
681+
}
682+
683+
}
622684
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ public void testChecksumValidationDuringVectoredReadSmallFile() throws Exception
7474
validateCheckReadException(testPath, length, smallFileRanges);
7575
}
7676

77+
/**
78+
* Verify that checksum validation works through vectored reads.
79+
* @param testPath path to the file to be tested
80+
* @param length length of the file to be created
81+
* @param ranges ranges to be read from the file
82+
* @throws Exception any exception other than ChecksumException
83+
*/
7784
private void validateCheckReadException(Path testPath,
7885
int length,
7986
List<FileRange> ranges) throws Exception {

0 commit comments

Comments
 (0)