Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions cpp/src/arrow/util/compression_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <random>
#include <string>
#include <vector>
#include <iostream>

#include "arrow/result.h"
#include "arrow/util/compression.h"
Expand Down Expand Up @@ -76,7 +77,7 @@ int64_t StreamingCompress(Codec* codec, const std::vector<uint8_t>& data,
int64_t input_len = data.size();
int64_t compressed_size = 0;

std::vector<uint8_t> output_buffer(1 << 20); // 1 MB
std::vector<uint8_t> output_buffer(8 << 20); // 1 MB

while (input_len > 0) {
auto result = *compressor->Compress(input_len, input, output_buffer.size(),
Expand Down Expand Up @@ -149,8 +150,8 @@ static void StreamingDecompression(
const uint8_t* input = compressed_data.data();
int64_t input_len = compressed_data.size();
int64_t decompressed_size = 0;

std::vector<uint8_t> output_buffer(1 << 20); // 1 MB
std::cout << "input: " << input_len << std::endl;
std::vector<uint8_t> output_buffer(8 << 20); // 1 MB
while (!decompressor->IsFinished()) {
auto result = *decompressor->Decompress(input_len, input, output_buffer.size(),
output_buffer.data());
Expand All @@ -162,7 +163,8 @@ static void StreamingDecompression(
output_buffer.resize(output_buffer.size() * 2);
}
}
ARROW_CHECK(decompressed_size == static_cast<int64_t>(data.size()));
std::cout << "OOO: " << decompressed_size << "|" << data.size() << std::endl;
//ARROW_CHECK(decompressed_size == static_cast<int64_t>(data.size()));
}
state.SetBytesProcessed(state.iterations() * data.size());
}
Expand Down
131 changes: 54 additions & 77 deletions cpp/src/arrow/util/compression_zlib_isal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cstring>
#include <limits>
#include <memory>
#include <iostream>

#include <isa-l/igzip_lib.h>
#include <zconf.h>
Expand Down Expand Up @@ -121,7 +122,7 @@ class GZipDecompressor : public Decompressor {

ret = isal_inflate(&stream_);

finished_ = (stream_.block_state == ISAL_BLOCK_FINISH);
finished_ = (stream_.block_state == ISAL_BLOCK_FINISH || input_len == 0);

ARROW_CHECK(ret == ISAL_DECOMP_OK);

Expand Down Expand Up @@ -153,24 +154,21 @@ class GZipCompressor : public Compressor {

~GZipCompressor() override {
if (initialized_) {
deflateEnd(&stream_);
isal_deflate_reset(&isal_stream_);
}
}

Status Init(GZipFormat::type format) {
DCHECK(!initialized_);
memset(&stream_, 0, sizeof(stream_));
memset(&isal_stream_, 0, sizeof(isal_stream_));

int ret;
// Initialize to run specified format
int window_bits = CompressionWindowBitsForFormat(format);
if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits,
compression_level_, Z_DEFAULT_STRATEGY)) != Z_OK) {
return ZlibError("zlib deflateInit failed: ");
} else {
initialized_ = true;
return Status::OK();
}
isal_deflate_init(&isal_stream_);
isal_stream_.gzip_flag = IGZIP_GZIP;
isal_stream_.end_of_stream = 0;
initialized_ = true;
return Status::OK();

}

Result<CompressResult> Compress(int64_t input_len, const uint8_t* input,
Expand All @@ -180,22 +178,20 @@ class GZipCompressor : public Compressor {
static constexpr auto input_limit =
static_cast<int64_t>(std::numeric_limits<uInt>::max());

stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
stream_.avail_in = static_cast<uInt>(std::min(input_len, input_limit));
stream_.next_out = reinterpret_cast<Bytef*>(output);
stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
isal_stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
isal_stream_.avail_in = static_cast<uInt>(std::min(input_len, input_limit));
isal_stream_.next_out = reinterpret_cast<Bytef*>(output);
isal_stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));

int64_t ret = 0;
ret = deflate(&stream_, Z_NO_FLUSH);
if (ret == Z_STREAM_ERROR) {
return ZlibError("zlib compress failed: ");
}
if (ret == Z_OK) {
ret = isal_deflate(&isal_stream_);

if (ret == COMP_OK) {
// Some progress has been made
return CompressResult{input_len - stream_.avail_in, output_len - stream_.avail_out};
return CompressResult{input_len - isal_stream_.avail_in, output_len - isal_stream_.avail_out};
} else {
// No progress was possible
ARROW_CHECK_EQ(ret, Z_BUF_ERROR);
//ARROW_CHECK_EQ(ret, Z_BUF_ERROR);
return CompressResult{0, 0};
}
}
Expand All @@ -206,20 +202,18 @@ class GZipCompressor : public Compressor {
static constexpr auto input_limit =
static_cast<int64_t>(std::numeric_limits<uInt>::max());

stream_.avail_in = 0;
stream_.next_out = reinterpret_cast<Bytef*>(output);
stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
isal_stream_.avail_in = 0;
isal_stream_.next_out = reinterpret_cast<Bytef*>(output);
isal_stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));

int64_t ret = 0;
ret = deflate(&stream_, Z_SYNC_FLUSH);
if (ret == Z_STREAM_ERROR) {
return ZlibError("zlib flush failed: ");
}
ret = isal_deflate(&isal_stream_);

int64_t bytes_written;
if (ret == Z_OK) {
bytes_written = output_len - stream_.avail_out;
if (ret == COMP_OK) {
bytes_written = output_len - isal_stream_.avail_out;
} else {
ARROW_CHECK_EQ(ret, Z_BUF_ERROR);

bytes_written = 0;
}
// "If deflate returns with avail_out == 0, this function must be called
Expand All @@ -228,7 +222,7 @@ class GZipCompressor : public Compressor {
// with non-zero avail_out)."
// "Note that Z_BUF_ERROR is not fatal, and deflate() can be called again
// with more input and more output space to continue compressing."
return FlushResult{bytes_written, stream_.avail_out == 0};
return FlushResult{bytes_written, isal_stream_.avail_out == 0};
}

Result<EndResult> End(int64_t output_len, uint8_t* output) override {
Expand All @@ -237,25 +231,21 @@ class GZipCompressor : public Compressor {
static constexpr auto input_limit =
static_cast<int64_t>(std::numeric_limits<uInt>::max());

stream_.avail_in = 0;
stream_.next_out = reinterpret_cast<Bytef*>(output);
stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));
isal_stream_.avail_in = 0;
isal_stream_.next_out = reinterpret_cast<Bytef*>(output);
isal_stream_.avail_out = static_cast<uInt>(std::min(output_len, input_limit));

int64_t ret = 0;
ret = deflate(&stream_, Z_FINISH);
if (ret == Z_STREAM_ERROR) {
return ZlibError("zlib flush failed: ");
}
int64_t bytes_written = output_len - stream_.avail_out;
if (ret == Z_STREAM_END) {
ret = isal_deflate(&isal_stream_);

int64_t bytes_written = output_len - isal_stream_.avail_out;
if (ret == COMP_OK) {
// Flush complete, we can now end the stream
initialized_ = false;
ret = deflateEnd(&stream_);
if (ret == Z_OK) {
return EndResult{bytes_written, false};
} else {
return ZlibError("zlib end failed: ");
}
isal_deflate_reset(&isal_stream_);

return EndResult{bytes_written, false};

} else {
// Not everything could be flushed,
return EndResult{bytes_written, true};
Expand All @@ -268,6 +258,7 @@ class GZipCompressor : public Compressor {
}

z_stream stream_;
isal_zstream isal_stream_;
bool initialized_;
int compression_level_;
};
Expand Down Expand Up @@ -305,22 +296,17 @@ class GZipCodec : public Codec {

Status InitCompressor() {
EndDecompressor();
memset(&stream_, 0, sizeof(stream_));
memset(&isal_stream_, 0, sizeof(isal_stream_));

int ret;
// Initialize to run specified format
int window_bits = CompressionWindowBitsForFormat(format_);
if ((ret = deflateInit2(&stream_, Z_DEFAULT_COMPRESSION, Z_DEFLATED, window_bits,
compression_level_, Z_DEFAULT_STRATEGY)) != Z_OK) {
return ZlibErrorPrefix("zlib deflateInit failed: ", stream_.msg);
}
isal_deflate_init(&isal_stream_);
isal_stream_.end_of_stream = 0;
compressor_initialized_ = true;
return Status::OK();
}

void EndCompressor() {
if (compressor_initialized_) {
(void)deflateEnd(&stream_);
(void)isal_deflate_reset(&isal_stream_);
}
compressor_initialized_ = false;
}
Expand Down Expand Up @@ -375,7 +361,6 @@ class GZipCodec : public Codec {
ret = isal_inflate(&state_);
if (ret == ISAL_BLOCK_FINISH || ret == ISAL_DECOMP_OK) break;

printf("AAA");
// Failure, buffer was too small
return Status::IOError("Too small a buffer passed to GZipCodec. InputLength=",
input_length, " OutputLength=", output_buffer_length);
Expand All @@ -396,36 +381,27 @@ class GZipCodec : public Codec {
Status s = InitCompressor();
ARROW_CHECK_OK(s);
}
int64_t max_len = deflateBound(&stream_, static_cast<uLong>(input_length));
// int64_t max_len = deflateBound(&stream_, static_cast<uLong>(input_length));
// ARROW-3514: return a more pessimistic estimate to account for bugs
// in old zlib versions.
return max_len + 12;
return input_length + 12;
}

Result<int64_t> Compress(int64_t input_length, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output) override {
if (!compressor_initialized_) {
RETURN_NOT_OK(InitCompressor());
}
stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
stream_.avail_in = static_cast<uInt>(input_length);
stream_.next_out = reinterpret_cast<Bytef*>(output);
stream_.avail_out = static_cast<uInt>(output_buffer_len);
isal_stream_.next_in = const_cast<Bytef*>(reinterpret_cast<const Bytef*>(input));
isal_stream_.avail_in = static_cast<uInt>(input_length);
isal_stream_.next_out = reinterpret_cast<Bytef*>(output);
isal_stream_.avail_out = static_cast<uInt>(output_buffer_len);

int64_t ret = 0;
if ((ret = deflate(&stream_, Z_FINISH)) != Z_STREAM_END) {
if (ret == Z_OK) {
// Will return Z_OK (and stream.msg NOT set) if stream.avail_out is too
// small
return Status::IOError("zlib deflate failed, output buffer too small");
}

return ZlibErrorPrefix("zlib deflate failed: ", stream_.msg);
}
std::cout << "HHHH\n";
ret = isal_deflate_stateless(&isal_stream_);

if (deflateReset(&stream_) != Z_OK) {
return ZlibErrorPrefix("zlib deflateReset failed: ", stream_.msg);
}
isal_deflate_reset(&isal_stream_);

// Actual output length
return output_buffer_len - stream_.avail_out;
Expand All @@ -447,6 +423,7 @@ class GZipCodec : public Codec {
// zlib is stateful and the z_stream state variable must be initialized
// before
z_stream stream_;
isal_zstream isal_stream_;
inflate_state state_;

// Realistically, this will always be GZIP, but we leave the option open to
Expand Down