Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -57,6 +58,7 @@
import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
import static org.apache.hadoop.io.Sizes.S_0;

/****************************************************************
* Abstract Checksumed FileSystem.
Expand Down Expand Up @@ -101,6 +103,14 @@ public void setVerifyChecksum(boolean verifyChecksum) {
this.verifyChecksum = verifyChecksum;
}

/**
* Is checksum verification enabled?
* @return true if files are to be verified through checksums.
*/
public boolean getVerifyChecksum() {
return verifyChecksum;
}

@Override
public void setWriteChecksum(boolean writeChecksum) {
this.writeChecksum = writeChecksum;
Expand Down Expand Up @@ -165,7 +175,8 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) {

/*******************************************************
* For open()'s FSInputStream
* It verifies that data matches checksums.
* It verifies that data matches checksums iff the data
* file has matching checksums.
*******************************************************/
private static class ChecksumFSInputChecker extends FSInputChecker implements
IOStatisticsSource, StreamCapabilities {
Expand Down Expand Up @@ -426,8 +437,18 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes,
return data;
}

/**
* Turn off range merging to make buffer recycling more likely (but not guaranteed).
* @return 0, always
*/
@Override
public int maxReadSizeForVectorReads() {
return S_0;
}

/**
* Vectored read.
* <p>
* If the file has no checksums: delegate to the underlying stream.
* If the file is checksummed: calculate the checksum ranges as
* well as the data ranges, read both, and validate the checksums
Expand All @@ -448,10 +469,12 @@ public void readVectored(final List<? extends FileRange> ranges,
final Consumer<ByteBuffer> release) throws IOException {

// If the stream doesn't have checksums, just delegate.
if (sums == null) {
if (delegateVectorReadsToInner()) {
LOG.debug("No checksums for vectored read, delegating to inner stream");
datas.readVectored(ranges, allocate);
return;
}
LOG.debug("Checksum vectored read for {} ranges", ranges.size());
final long length = getFileLength();
final List<? extends FileRange> sorted = validateAndSortRanges(ranges,
Optional.of(length));
Expand Down Expand Up @@ -489,9 +512,27 @@ public void readVectored(final List<? extends FileRange> ranges,
}
}

private boolean delegateVectorReadsToInner() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this name seems a bit off to me. Also javadocs.

return sums == null;
}

/**
* For this stream, declare that range merging may take place;
* otherwise delegate to the inner stream.
* @param capability string to query the stream support for.
* @return true for sliced vector IO if checksum validation
* is taking place. False if no checksums are present for the validation.
* For all other probes: pass to the wrapped stream
*/
@Override
public boolean hasCapability(String capability) {
return datas.hasCapability(capability);
switch (capability.toLowerCase(Locale.ENGLISH)) {
// slicing can take place during coalescing and checksumming
case StreamCapabilities.VECTOREDIO_BUFFERS_SLICED:
return !delegateVectorReadsToInner();
default:
return datas.hasCapability(capability);
}
}
}

Expand Down Expand Up @@ -1142,6 +1183,9 @@ public boolean hasPathCapability(final Path path, final String capability)
case CommonPathCapabilities.FS_APPEND:
case CommonPathCapabilities.FS_CONCAT:
return false;
case StreamCapabilities.VECTOREDIO_BUFFERS_SLICED:
return getVerifyChecksum();

default:
return super.hasPathCapability(p, capability);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;

import static org.apache.hadoop.fs.LocalFileSystemConfigKeys.LOCAL_FS_VERIFY_CHECKSUM;

/****************************************************************
* Implement the FileSystem API for the checksumed local filesystem.
*
Expand All @@ -50,6 +52,10 @@ public void initialize(URI name, Configuration conf) throws IOException {
if (!scheme.equals(fs.getUri().getScheme())) {
swapScheme = scheme;
}
final boolean checksum = conf.getBoolean(LOCAL_FS_VERIFY_CHECKSUM, true);
setVerifyChecksum(checksum);
LOG.debug("Checksum verification enabled={}", checksum);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,12 @@ public class LocalFileSystemConfigKeys extends CommonConfigurationKeys {
public static final String LOCAL_FS_CLIENT_WRITE_PACKET_SIZE_KEY =
"file.client-write-packet-size";
public static final int LOCAL_FS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary change?

/**
* Verify checksums on read -default is true.
* <p>
* {@value}.
*/
public static final String LOCAL_FS_VERIFY_CHECKSUM = "file.verify-checksum";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename fs.file.checksum.very and delclare in core-defaults to make more visible

}

Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;
import static org.apache.hadoop.fs.VectoredReadUtils.hasVectorIOCapability;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList;
import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
Expand Down Expand Up @@ -293,15 +294,12 @@ public FileDescriptor getFileDescriptor() throws IOException {

@Override
public boolean hasCapability(String capability) {
// a bit inefficient, but intended to make it easier to add
// new capabilities.
switch (capability.toLowerCase(Locale.ENGLISH)) {
case StreamCapabilities.IOSTATISTICS:
case StreamCapabilities.IOSTATISTICS_CONTEXT:
case StreamCapabilities.VECTOREDIO:
return true;
default:
return false;
return hasVectorIOCapability(capability);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually change the logic? It seems like hasVectorIOCapability checks StreamCapabilities.VECTOREDIO, just like the old code did.

}
}

Expand Down Expand Up @@ -400,7 +398,9 @@ private void initiateRead() {
for(int i = 0; i < ranges.size(); ++i) {
FileRange range = ranges.get(i);
buffers[i] = allocateRelease.getBuffer(false, range.getLength());
channel.read(buffers[i], range.getOffset(), i, this);
final ByteBuffer buffer = buffers[i];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any specific reason for this refactoring and making it final?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets me refer to it in the debug() statement

LOG.debug("Reading file range {} into buffer {}", range, System.identityHashCode(buffer));
channel.read(buffer, range.getOffset(), i, this);
}
}

Expand All @@ -416,6 +416,8 @@ private void initiateRead() {
public void completed(Integer result, Integer rangeIndex) {
FileRange range = ranges.get(rangeIndex);
ByteBuffer buffer = buffers[rangeIndex];
LOG.debug("Completed read range {} into buffer {} outcome={} remaining={}",
range, System.identityHashCode(buffer), result, buffer.remaining());
if (result == -1) {
// no data was read back.
failed(new EOFException("Read past End of File"), rangeIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ public interface StreamCapabilities {
*/
String VECTOREDIO = "in:readvectored";

/**
* When performing vectored IO operations, are the buffers returned by readVectored()
* potentially sliced subsets of buffers allocated by the allocate() function
* passed in the read requests?
* If true, this means that the returned buffers may be sliced subsets of the
* allocated buffers.
*/
String VECTOREDIO_BUFFERS_SLICED = "fs.capability.vectoredio.sliced";

/**
* Stream abort() capability implemented by {@link Abortable#abort()}.
* This matches the Path Capability
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
Expand Down Expand Up @@ -476,6 +477,22 @@ public static ByteBuffer sliceTo(ByteBuffer readData, long readOffset,
return readData;
}

/**
* Default vector IO probes.
* These are capabilities which streams that leave vector IO
* to the default methods should return when queried for vector capabilities.
* @param capability capability to probe for.
* @return true if the given capability holds for vectored IO features.
*/
public static boolean hasVectorIOCapability(String capability) {
switch (capability.toLowerCase(Locale.ENGLISH)) {
case StreamCapabilities.VECTOREDIO:
return true;
default:
return false;
}
}

/**
* private constructor.
*/
Expand Down
Loading