Skip to content

PoC: Add API to allow to use zstd prepared dictionary #8128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 55 additions & 15 deletions parquet/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub trait Codec: Send {
pub struct CodecOptions {
/// Whether or not to fallback to other LZ4 older implementations on error in LZ4_HADOOP.
backward_compatible_lz4: bool,
/// Shared dictionary shared dictionary for ZSTD
prepared_dictionary: Option<Vec<u8>>
}

impl Default for CodecOptions {
Expand All @@ -90,12 +92,15 @@ impl Default for CodecOptions {
pub struct CodecOptionsBuilder {
/// Whether or not to fallback to other LZ4 older implementations on error in LZ4_HADOOP.
backward_compatible_lz4: bool,
/// Shared dictionary shared dictionary for ZSTD
prepared_dictionary_zstd: Option<Vec<u8>>
}

impl Default for CodecOptionsBuilder {
fn default() -> Self {
Self {
backward_compatible_lz4: true,
prepared_dictionary_zstd: None,
}
}
}
Expand All @@ -114,9 +119,19 @@ impl CodecOptionsBuilder {
self
}

/// Set shared dictionary for ZSTD compression.
/// This allows to use a shared dictionary for ZSTD compression, potentially improving compression ratios
#[allow(dead_code)]
pub fn set_prepared_dictionary(mut self, value: Vec<u8>) -> CodecOptionsBuilder {
self.prepared_dictionary_zstd = Some(value);
self
}


pub fn build(self) -> CodecOptions {
CodecOptions {
backward_compatible_lz4: self.backward_compatible_lz4,
prepared_dictionary: self.prepared_dictionary_zstd,
}
}
}
Expand Down Expand Up @@ -179,7 +194,7 @@ pub fn create_codec(codec: CodecType, _options: &CodecOptions) -> Result<Option<
}
CodecType::ZSTD(level) => {
#[cfg(any(feature = "zstd", test))]
return Ok(Some(Box::new(ZSTDCodec::new(level))));
return Ok(Some(Box::new(ZSTDCodec::new(level,None ))));
Err(ParquetError::General(
"Disabled feature at compile time: zstd".into(),
))
Expand Down Expand Up @@ -505,41 +520,66 @@ pub use lz4_codec::*;
mod zstd_codec {
use std::io::{self, Write};

use zstd::dict::{DecoderDictionary, EncoderDictionary};

use crate::compression::{Codec, ZstdLevel};
use crate::errors::Result;

/// Codec for Zstandard compression algorithm.
pub struct ZSTDCodec {
pub struct ZSTDCodec<'a> {
level: ZstdLevel,
prepared_dictionary_decode: Option<DecoderDictionary<'a>>,
prepared_dictionary_encode: Option<EncoderDictionary<'a>>,
}

impl ZSTDCodec {
impl ZSTDCodec<'_> {
/// Creates new Zstandard compression codec.
pub(crate) fn new(level: ZstdLevel) -> Self {
Self { level }
pub(crate) fn new(level: ZstdLevel, prepared_dictionary: Option<Vec<u8>>) -> Self {
if let Some(dict) = &prepared_dictionary {
Self { level, prepared_dictionary_decode: Some(DecoderDictionary::copy(dict)), prepared_dictionary_encode: Some(EncoderDictionary::copy(dict, level.compression_level())) }
} else {
Self { level, prepared_dictionary_decode: None, prepared_dictionary_encode: None }
}
}
}

impl Codec for ZSTDCodec {
impl Codec for ZSTDCodec<'_> {
fn decompress(
&mut self,
input_buf: &[u8],
output_buf: &mut Vec<u8>,
_uncompress_size: Option<usize>,
) -> Result<usize> {
let mut decoder = zstd::Decoder::new(input_buf)?;
match io::copy(&mut decoder, output_buf) {
Ok(n) => Ok(n as usize),
Err(e) => Err(e.into()),
if let Some(dict) = &self.prepared_dictionary_decode {
let mut decoder = zstd::Decoder::with_prepared_dictionary(input_buf, &dict)?;
match io::copy(&mut decoder, output_buf) {
Ok(n) => Ok(n as usize),
Err(e) => Err(e.into())
}
} else {
let mut decoder = zstd::Decoder::new(input_buf)?;
match io::copy(&mut decoder, output_buf) {
Ok(n) => Ok(n as usize),
Err(e) => Err(e.into())
}
}
}

fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
let mut encoder = zstd::Encoder::new(output_buf, self.level.0)?;
encoder.write_all(input_buf)?;
match encoder.finish() {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
if let Some(dict) = &self.prepared_dictionary_encode {
let mut encoder = zstd::Encoder::with_prepared_dictionary(output_buf, dict)?;
encoder.write_all(input_buf)?;
match encoder.finish() {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
} else {
let mut encoder = zstd::Encoder::new(output_buf, self.level.0)?;
encoder.write_all(input_buf)?;
match encoder.finish() {
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
}
}
}
}
Expand Down
Loading