Skip to content

Commit 7274e4b

Browse files
committed
Add FSST support
1 parent bab5580 commit 7274e4b

21 files changed

+3095
-4
lines changed

cpp/src/parquet/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ set(PARQUET_SRCS
171171
encryption/internal_file_encryptor.cc
172172
exception.cc
173173
file_reader.cc
174+
thirdparty/fsst/libfsst.cpp
175+
thirdparty/fsst/fsst_avx512.cpp
174176
file_writer.cc
175177
geospatial/statistics.cc
176178
geospatial/util_internal.cc

cpp/src/parquet/column_reader.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -862,7 +862,8 @@ class ColumnReaderImplBase {
862862
case Encoding::RLE:
863863
case Encoding::DELTA_BINARY_PACKED:
864864
case Encoding::DELTA_BYTE_ARRAY:
865-
case Encoding::DELTA_LENGTH_BYTE_ARRAY: {
865+
case Encoding::DELTA_LENGTH_BYTE_ARRAY:
866+
case Encoding::FSST: {
866867
auto decoder = MakeTypedDecoder<DType>(encoding, descr_, pool_);
867868
current_decoder_ = decoder.get();
868869
decoders_[static_cast<int>(encoding)] = std::move(decoder);

cpp/src/parquet/decoder.cc

Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@
2121
#include <cstdint>
2222
#include <cstdlib>
2323
#include <cstring>
24+
#include <iostream>
2425
#include <limits>
2526
#include <memory>
2627
#include <string>
2728
#include <string_view>
2829
#include <type_traits>
2930
#include <utility>
3031
#include <vector>
32+
#include <deque>
3133

3234
#include "arrow/array.h"
3335
#include "arrow/array/builder_binary.h"
@@ -51,6 +53,7 @@
5153
#include "parquet/exception.h"
5254
#include "parquet/platform.h"
5355
#include "parquet/schema.h"
56+
#include "parquet/thirdparty/fsst/fsst.h"
5457
#include "parquet/types.h"
5558

5659
#ifdef _MSC_VER
@@ -2307,6 +2310,258 @@ class ByteStreamSplitDecoder<FLBAType> : public ByteStreamSplitDecoderBase<FLBAT
23072310
}
23082311
};
23092312

2313+
// ----------------------------------------------------------------------
2314+
2315+
class FsstDecoder : public DecoderImpl, virtual public TypedDecoder<ByteArrayType> {
2316+
public:
2317+
using Base = DecoderImpl;
2318+
using Base::num_values_;
2319+
2320+
explicit FsstDecoder(const ColumnDescriptor* descr,
2321+
::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
2322+
: DecoderImpl(descr, Encoding::FSST), pool_(pool) {}
2323+
2324+
void SetData(int num_values, const uint8_t* data, int len) override {
2325+
const auto header_size = static_cast<int>(sizeof(fsst_decoder_t));
2326+
if (len < header_size) {
2327+
throw ParquetException("FSST page too small to contain decoder header");
2328+
}
2329+
num_values_ = num_values;
2330+
memcpy(&decoder_, data, sizeof(fsst_decoder_t));
2331+
next_ = data + header_size;
2332+
remaining_bytes_ = len - header_size;
2333+
decode_buffer_size_ = 0;
2334+
}
2335+
2336+
int Decode(ByteArray* buffer, int max_values) override {
2337+
max_values = std::min(max_values, num_values_);
2338+
if (max_values == 0) {
2339+
return 0;
2340+
}
2341+
2342+
const int64_t estimated_output_size =
2343+
decode_buffer_size_ +
2344+
static_cast<int64_t>(remaining_bytes_) * kMaxDecompressionExpansion;
2345+
EnsureDecodeBuffer(estimated_output_size);
2346+
2347+
int decoded = 0;
2348+
2349+
while (decoded < max_values) {
2350+
if (ARROW_PREDICT_FALSE(remaining_bytes_ < static_cast<int>(kLengthPrefixSize))) {
2351+
throw ParquetException("FSST data truncated before length prefix");
2352+
}
2353+
2354+
const uint32_t compressed_len = SafeLoadAs<uint32_t>(next_);
2355+
next_ += kLengthPrefixSize;
2356+
remaining_bytes_ -= static_cast<int>(kLengthPrefixSize);
2357+
2358+
if (ARROW_PREDICT_FALSE(compressed_len > static_cast<uint32_t>(remaining_bytes_))) {
2359+
throw ParquetException("FSST compressed length exceeds available data");
2360+
}
2361+
2362+
const uint8_t* compressed_ptr = next_;
2363+
next_ += compressed_len;
2364+
remaining_bytes_ -= static_cast<int>(compressed_len);
2365+
2366+
uint8_t* value_ptr = nullptr;
2367+
const size_t decompressed_len =
2368+
DecompressValue(compressed_ptr, compressed_len, &value_ptr);
2369+
2370+
buffer[decoded].ptr = value_ptr;
2371+
buffer[decoded].len = static_cast<uint32_t>(decompressed_len);
2372+
decode_buffer_size_ += static_cast<int64_t>(decompressed_len);
2373+
++decoded;
2374+
}
2375+
2376+
num_values_ -= decoded;
2377+
return decoded;
2378+
}
2379+
2380+
int DecodeSpaced(ByteArray* buffer, int num_values, int null_count,
2381+
const uint8_t* valid_bits,
2382+
int64_t valid_bits_offset) override {
2383+
if (null_count == 0) {
2384+
return Decode(buffer, num_values);
2385+
}
2386+
2387+
const int values_to_decode = num_values - null_count;
2388+
temp_values_.resize(values_to_decode);
2389+
const int decoded = Decode(temp_values_.data(), values_to_decode);
2390+
if (ARROW_PREDICT_FALSE(decoded != values_to_decode)) {
2391+
throw ParquetException("Expected to decode ", values_to_decode,
2392+
" values but decoded ", decoded, " values.");
2393+
}
2394+
2395+
int value_index = 0;
2396+
for (int i = 0; i < num_values; ++i) {
2397+
if (bit_util::GetBit(valid_bits, valid_bits_offset + i)) {
2398+
buffer[i] = temp_values_[value_index++];
2399+
} else {
2400+
buffer[i].ptr = nullptr;
2401+
buffer[i].len = 0;
2402+
}
2403+
}
2404+
2405+
return value_index;
2406+
}
2407+
2408+
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
2409+
int64_t valid_bits_offset,
2410+
typename EncodingTraits<ByteArrayType>::Accumulator* builder) override {
2411+
int values_decoded = 0;
2412+
PARQUET_THROW_NOT_OK(
2413+
DecodeArrowDense(num_values, null_count, valid_bits, valid_bits_offset, builder,
2414+
&values_decoded));
2415+
return values_decoded;
2416+
}
2417+
2418+
int DecodeArrow(int num_values, int null_count, const uint8_t* valid_bits,
2419+
int64_t valid_bits_offset,
2420+
typename EncodingTraits<ByteArrayType>::DictAccumulator* builder) override {
2421+
int values_decoded = 0;
2422+
PARQUET_THROW_NOT_OK(
2423+
DecodeArrowDict(num_values, null_count, valid_bits, valid_bits_offset, builder,
2424+
&values_decoded));
2425+
return values_decoded;
2426+
}
2427+
2428+
private:
2429+
Status DecodeArrowDense(int num_values, int null_count, const uint8_t* valid_bits,
2430+
int64_t valid_bits_offset,
2431+
typename EncodingTraits<ByteArrayType>::Accumulator* out,
2432+
int* out_values_decoded) {
2433+
const int values_to_decode = num_values - null_count;
2434+
temp_values_.resize(values_to_decode);
2435+
2436+
const int decoded = Decode(temp_values_.data(), values_to_decode);
2437+
if (ARROW_PREDICT_FALSE(decoded != values_to_decode)) {
2438+
throw ParquetException("Expected to decode ", values_to_decode,
2439+
" values but decoded ", decoded, " values.");
2440+
}
2441+
2442+
auto visit_binary_helper = [&](auto* helper) {
2443+
auto* values_ptr = temp_values_.data();
2444+
int value_index = 0;
2445+
2446+
RETURN_NOT_OK(
2447+
VisitBitRuns(valid_bits, valid_bits_offset, num_values,
2448+
[&](int64_t position, int64_t run_length, bool is_valid) {
2449+
if (is_valid) {
2450+
for (int64_t i = 0; i < run_length; ++i) {
2451+
const auto& value = values_ptr[value_index++];
2452+
RETURN_NOT_OK(helper->AppendValue(
2453+
value.ptr, static_cast<int32_t>(value.len)));
2454+
}
2455+
} else {
2456+
RETURN_NOT_OK(helper->AppendNulls(run_length));
2457+
}
2458+
return Status::OK();
2459+
}));
2460+
2461+
*out_values_decoded = decoded;
2462+
return Status::OK();
2463+
};
2464+
2465+
return DispatchArrowBinaryHelper<ByteArrayType>(
2466+
out, num_values, /*estimated_data_length=*/{}, visit_binary_helper);
2467+
}
2468+
2469+
Status DecodeArrowDict(int num_values, int null_count, const uint8_t* valid_bits,
2470+
int64_t valid_bits_offset,
2471+
typename EncodingTraits<ByteArrayType>::DictAccumulator* builder,
2472+
int* out_values_decoded) {
2473+
const int values_to_decode = num_values - null_count;
2474+
temp_values_.resize(values_to_decode);
2475+
2476+
const int decoded = Decode(temp_values_.data(), values_to_decode);
2477+
if (ARROW_PREDICT_FALSE(decoded != values_to_decode)) {
2478+
throw ParquetException("Expected to decode ", values_to_decode,
2479+
" values but decoded ", decoded, " values.");
2480+
}
2481+
2482+
RETURN_NOT_OK(builder->Reserve(num_values));
2483+
2484+
int value_index = 0;
2485+
RETURN_NOT_OK(VisitBitRuns(
2486+
valid_bits, valid_bits_offset, num_values,
2487+
[&](int64_t position, int64_t run_length, bool is_valid) {
2488+
if (is_valid) {
2489+
for (int64_t i = 0; i < run_length; ++i) {
2490+
const auto& value = temp_values_[value_index++];
2491+
RETURN_NOT_OK(builder->Append(value.ptr, static_cast<int32_t>(value.len)));
2492+
}
2493+
} else {
2494+
RETURN_NOT_OK(builder->AppendNulls(run_length));
2495+
}
2496+
return Status::OK();
2497+
}));
2498+
2499+
*out_values_decoded = decoded;
2500+
return Status::OK();
2501+
}
2502+
2503+
uint8_t* EnsureDecodeBuffer(int64_t capacity) {
2504+
const int64_t min_capacity =
2505+
std::max<int64_t>(capacity, kInitialDecodeBufferSize);
2506+
const int64_t target = ::arrow::bit_util::NextPower2(min_capacity);
2507+
2508+
if (!decode_buffer_) {
2509+
PARQUET_ASSIGN_OR_THROW(
2510+
decode_buffer_, ::arrow::AllocateResizableBuffer(target, pool_));
2511+
} else if (decode_buffer_->size() < target) {
2512+
PARQUET_THROW_NOT_OK(decode_buffer_->Resize(target, false));
2513+
}
2514+
return decode_buffer_->mutable_data();
2515+
}
2516+
2517+
size_t DecompressValue(const uint8_t* compressed_ptr, uint32_t compressed_len,
2518+
uint8_t** value_ptr) {
2519+
EnsureDecodeBuffer(decode_buffer_size_ +
2520+
OutputUpperBound(compressed_len));
2521+
2522+
while (true) {
2523+
uint8_t* destination = decode_buffer_->mutable_data() + decode_buffer_size_;
2524+
const size_t available =
2525+
static_cast<size_t>(decode_buffer_->size() - decode_buffer_size_);
2526+
2527+
const size_t decompressed =
2528+
fsst_decompress(&decoder_, compressed_len, compressed_ptr, available,
2529+
destination);
2530+
2531+
if (decompressed > 0 || compressed_len == 0) {
2532+
*value_ptr = destination;
2533+
return decompressed;
2534+
}
2535+
2536+
int64_t new_capacity = std::max<int64_t>(
2537+
decode_buffer_->size() * 2,
2538+
decode_buffer_size_ + OutputUpperBound(compressed_len));
2539+
if (new_capacity <= decode_buffer_->size()) {
2540+
throw ParquetException("FSST decompression failed");
2541+
}
2542+
EnsureDecodeBuffer(new_capacity);
2543+
}
2544+
}
2545+
2546+
static int64_t OutputUpperBound(uint32_t compressed_len) {
2547+
const int64_t expanded =
2548+
static_cast<int64_t>(compressed_len) * kMaxDecompressionExpansion;
2549+
return std::max<int64_t>(expanded, kInitialDecodeBufferSize);
2550+
}
2551+
2552+
static constexpr size_t kLengthPrefixSize = sizeof(uint32_t);
2553+
static constexpr int64_t kInitialDecodeBufferSize = 1024;
2554+
static constexpr int64_t kMaxDecompressionExpansion = 8;
2555+
2556+
fsst_decoder_t decoder_{};
2557+
const uint8_t* next_ = nullptr;
2558+
int remaining_bytes_ = 0;
2559+
::arrow::MemoryPool* pool_;
2560+
std::shared_ptr<::arrow::ResizableBuffer> decode_buffer_;
2561+
int64_t decode_buffer_size_ = 0;
2562+
std::vector<ByteArray> temp_values_;
2563+
};
2564+
23102565
} // namespace
23112566

23122567
// ----------------------------------------------------------------------
@@ -2373,6 +2628,13 @@ std::unique_ptr<Decoder> MakeDecoder(Type::type type_num, Encoding::type encodin
23732628
throw ParquetException(
23742629
"DELTA_BYTE_ARRAY only supports BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY");
23752630
}
2631+
} else if (encoding == Encoding::FSST) {
2632+
switch (type_num) {
2633+
case Type::BYTE_ARRAY:
2634+
return std::make_unique<FsstDecoder>(descr, pool);
2635+
default:
2636+
throw ParquetException("FSST encoding only supports BYTE_ARRAY");
2637+
}
23762638
} else if (encoding == Encoding::DELTA_LENGTH_BYTE_ARRAY) {
23772639
if (type_num == Type::BYTE_ARRAY) {
23782640
return std::make_unique<DeltaLengthByteArrayDecoder>(descr, pool);

0 commit comments

Comments
 (0)