Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion encodings/alp/src/alp/compute/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl MaskKernel for ALP {
mask: &ArrayRef,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
let vortex_mask = Validity::Array(mask.not()?).to_mask(array.len());
let vortex_mask = Validity::Array(mask.not()?).execute_mask(array.len(), ctx)?;
let masked_encoded = array.encoded().clone().mask(mask.clone())?;
let masked_patches = array
.patches()
Expand Down
10 changes: 4 additions & 6 deletions encodings/datetime-parts/src/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,19 +147,17 @@ mod test {
))
.unwrap();

assert_eq!(
date_times.validity_mask().unwrap(),
validity.to_mask(date_times.len())
);

let mut ctx = ExecutionCtx::new(VortexSession::empty());

assert!(date_times.validity()?.mask_eq(&validity, &mut ctx)?);

let primitive_values = decode_to_temporal(&date_times, &mut ctx)?
.temporal_values()
.clone()
.execute::<PrimitiveArray>(&mut ctx)?;

assert_arrays_eq!(primitive_values, milliseconds);
assert_eq!(primitive_values.validity(), &validity);
assert!(primitive_values.validity().mask_eq(&validity, &mut ctx)?);
Ok(())
}
}
21 changes: 18 additions & 3 deletions encodings/datetime-parts/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ impl TryFrom<TemporalArray> for DateTimePartsArray {
mod tests {
use rstest::rstest;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::ToCanonical;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::TemporalArray;
use vortex_array::extension::datetime::TimeUnit;
Expand Down Expand Up @@ -110,8 +112,21 @@ mod tests {
seconds,
subseconds,
} = split_temporal(temporal_array).unwrap();
assert_eq!(days.to_primitive().validity(), &validity);
assert_eq!(seconds.to_primitive().validity(), &Validity::NonNullable);
assert_eq!(subseconds.to_primitive().validity(), &Validity::NonNullable);

let mut ctx = LEGACY_SESSION.create_execution_ctx();
assert!(
days.to_primitive()
.validity()
.mask_eq(&validity, &mut ctx)
.unwrap()
);
assert!(matches!(
seconds.to_primitive().validity(),
Validity::NonNullable
));
assert!(matches!(
subseconds.to_primitive().validity(),
Validity::NonNullable
));
}
}
9 changes: 8 additions & 1 deletion encodings/fastlanes/src/rle/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ mod tests {
use vortex_array::ArrayContext;
use vortex_array::DynArray;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::ToCanonical;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::assert_arrays_eq;
use vortex_array::dtype::DType;
Expand Down Expand Up @@ -397,7 +399,12 @@ mod tests {
let sliced_array = rle_array.slice(1..4).unwrap();
let validity_mask = sliced_array.validity_mask().unwrap();

let expected_mask = Validity::from_iter([false, true, false]).to_mask(3);
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let expected_mask = Validity::from_iter([false, true, false])
.execute_mask(3, &mut ctx)
.unwrap();
assert_eq!(validity_mask.len(), expected_mask.len());
assert_eq!(validity_mask, expected_mask);
assert_eq!(validity_mask.len(), expected_mask.len());
assert_eq!(validity_mask, expected_mask);
}
Expand Down
4 changes: 2 additions & 2 deletions encodings/pco/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub fn vortex_pco::Pco::deserialize(bytes: &[u8], _dtype: &vortex_array::dtype::

pub fn vortex_pco::Pco::dtype(array: &vortex_pco::PcoArray) -> &vortex_array::dtype::DType

pub fn vortex_pco::Pco::execute(array: &Self::Array, _ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::executor::ExecutionStep>
pub fn vortex_pco::Pco::execute(array: &Self::Array, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::executor::ExecutionStep>

pub fn vortex_pco::Pco::id(_array: &Self::Array) -> vortex_array::vtable::dyn_::ArrayId

Expand Down Expand Up @@ -74,7 +74,7 @@ pub struct vortex_pco::PcoArray

impl vortex_pco::PcoArray

pub fn vortex_pco::PcoArray::decompress(&self) -> vortex_error::VortexResult<vortex_array::arrays::primitive::array::PrimitiveArray>
pub fn vortex_pco::PcoArray::decompress(&self, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::arrays::primitive::array::PrimitiveArray>

pub fn vortex_pco::PcoArray::from_array(array: vortex_array::array::ArrayRef, level: usize, nums_per_page: usize) -> vortex_error::VortexResult<Self>

Expand Down
23 changes: 16 additions & 7 deletions encodings/pco/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ use vortex_array::DynArray;
use vortex_array::ExecutionCtx;
use vortex_array::ExecutionStep;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::Precision;
use vortex_array::ProstMetadata;
use vortex_array::ToCanonical;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::Primitive;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::buffer::BufferHandle;
Expand Down Expand Up @@ -263,8 +265,8 @@ impl VTable for Pco {
Ok(())
}

fn execute(array: &Self::Array, _ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
Ok(ExecutionStep::Done(array.decompress()?.into_array()))
fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
Ok(ExecutionStep::Done(array.decompress(ctx)?.into_array()))
}

fn reduce_parent(
Expand Down Expand Up @@ -437,14 +439,14 @@ impl PcoArray {
}
}

pub fn decompress(&self) -> VortexResult<PrimitiveArray> {
pub fn decompress(&self, ctx: &mut ExecutionCtx) -> VortexResult<PrimitiveArray> {
// To start, we figure out which chunks and pages we need to decompress, and with
// what value offset into the first such page.
let number_type = number_type_from_dtype(&self.dtype);
let values_byte_buffer = match_number_enum!(
number_type,
NumberType<T> => {
self.decompress_values_typed::<T>()?
self.decompress_values_typed::<T>(ctx)?
}
);

Expand All @@ -457,11 +459,14 @@ impl PcoArray {
))
}

fn decompress_values_typed<T: Number>(&self) -> VortexResult<ByteBuffer> {
fn decompress_values_typed<T: Number>(
&self,
ctx: &mut ExecutionCtx,
) -> VortexResult<ByteBuffer> {
// To start, we figure out what range of values we need to decompress.
let slice_value_indices = self
.unsliced_validity
.to_mask(self.unsliced_n_rows)
.execute_mask(self.unsliced_n_rows, ctx)?
.valid_counts_for_indices(&[self.slice_start, self.slice_stop]);
let slice_value_start = slice_value_indices[0];
let slice_value_stop = slice_value_indices[1];
Expand Down Expand Up @@ -564,7 +569,11 @@ impl ValiditySliceHelper for PcoArray {

impl OperationsVTable<Pco> for Pco {
fn scalar_at(array: &PcoArray, index: usize) -> VortexResult<Scalar> {
array._slice(index, index + 1).decompress()?.scalar_at(0)
let mut ctx = LEGACY_SESSION.create_execution_ctx();
array
._slice(index, index + 1)
.decompress(&mut ctx)?
.scalar_at(0)
}
}

Expand Down
23 changes: 17 additions & 6 deletions encodings/pco/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::LazyLock;

use vortex_array::ArrayContext;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::ToCanonical;
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::BoolArray;
Expand Down Expand Up @@ -47,7 +48,8 @@ fn test_compress_decompress() {
assert!(compressed.pages.len() < array.nbytes() as usize);

// check full decompression works
let decompressed = compressed.decompress().unwrap();
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let decompressed = compressed.decompress(&mut ctx).unwrap();
assert_arrays_eq!(decompressed, PrimitiveArray::from_iter(data));

// check slicing works
Expand All @@ -69,7 +71,8 @@ fn test_compress_decompress_small() {
let expected = array.into_array();
assert_arrays_eq!(compressed, expected);

let decompressed = compressed.decompress().unwrap();
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let decompressed = compressed.decompress(&mut ctx).unwrap();
assert_arrays_eq!(decompressed, expected);
}

Expand All @@ -78,7 +81,8 @@ fn test_empty() {
let data: Vec<i32> = vec![];
let array = PrimitiveArray::from_iter(data.clone());
let compressed = PcoArray::from_primitive(&array, 3, 100).unwrap();
let primitive = compressed.decompress().unwrap();
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let primitive = compressed.decompress(&mut ctx).unwrap();
assert_arrays_eq!(primitive, PrimitiveArray::from_iter(data));
}

Expand Down Expand Up @@ -118,9 +122,16 @@ fn test_validity_and_multiple_chunks_and_pages() {
assert_nth_scalar!(slice, 0, 100);
assert_nth_scalar!(slice, 2, 102);
let primitive = slice.to_primitive();
assert_eq!(
primitive.validity(),
&Validity::Array(BoolArray::from_iter(vec![true, false, true]).into_array())

let mut ctx = LEGACY_SESSION.create_execution_ctx();
assert!(
primitive
.validity()
.mask_eq(
&Validity::Array(BoolArray::from_iter(vec![true, false, true]).into_array()),
&mut ctx,
)
.unwrap()
);
}

Expand Down
2 changes: 1 addition & 1 deletion encodings/zstd/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub struct vortex_zstd::ZstdArray

impl vortex_zstd::ZstdArray

pub fn vortex_zstd::ZstdArray::decompress(&self) -> vortex_error::VortexResult<vortex_array::array::ArrayRef>
pub fn vortex_zstd::ZstdArray::decompress(&self, ctx: &mut vortex_array::executor::ExecutionCtx) -> vortex_error::VortexResult<vortex_array::array::ArrayRef>

pub fn vortex_zstd::ZstdArray::from_array(array: vortex_array::array::ArrayRef, level: i32, values_per_frame: usize) -> vortex_error::VortexResult<Self>

Expand Down
22 changes: 14 additions & 8 deletions encodings/zstd/src/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ use vortex_array::DynArray;
use vortex_array::ExecutionCtx;
use vortex_array::ExecutionStep;
use vortex_array::IntoArray;
use vortex_array::LEGACY_SESSION;
use vortex_array::Precision;
use vortex_array::ProstMetadata;
use vortex_array::ToCanonical;
use vortex_array::VortexSessionExecute;
use vortex_array::accessor::ArrayAccessor;
use vortex_array::arrays::ConstantArray;
use vortex_array::arrays::PrimitiveArray;
Expand Down Expand Up @@ -274,7 +276,7 @@ impl VTable for Zstd {

fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
array
.decompress()?
.decompress(ctx)?
.execute::<ArrayRef>(ctx)
.map(ExecutionStep::Done)
}
Expand Down Expand Up @@ -702,14 +704,14 @@ impl ZstdArray {
}
}

pub fn decompress(&self) -> VortexResult<ArrayRef> {
pub fn decompress(&self, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
// To start, we figure out which frames we need to decompress, and with
// what row offset into the first such frame.
let byte_width = self.byte_width();
let slice_n_rows = self.slice_stop - self.slice_start;
let slice_value_indices = self
.unsliced_validity
.to_mask(self.unsliced_n_rows)
.execute_mask(self.unsliced_n_rows, ctx)?
.valid_counts_for_indices(&[self.slice_start, self.slice_stop]);

let slice_value_idx_start = slice_value_indices[0];
Expand Down Expand Up @@ -787,14 +789,14 @@ impl ZstdArray {
//
// We ensure that the validity of the decompressed array ALWAYS matches the validity
// implied by the DType.
if !self.dtype().is_nullable() && slice_validity != Validity::NonNullable {
if !self.dtype().is_nullable() && !matches!(slice_validity, Validity::NonNullable) {
assert!(
slice_validity.all_valid(slice_n_rows)?,
matches!(slice_validity, Validity::AllValid),
"ZSTD array expects to be non-nullable but there are nulls after decompression"
);

slice_validity = Validity::NonNullable;
} else if self.dtype.is_nullable() && slice_validity == Validity::NonNullable {
} else if self.dtype.is_nullable() && matches!(slice_validity, Validity::NonNullable) {
slice_validity = Validity::AllValid;
}
//
Expand All @@ -817,7 +819,7 @@ impl ZstdArray {
Ok(primitive.into_array())
}
DType::Binary(_) | DType::Utf8(_) => {
match slice_validity.to_mask(slice_n_rows).indices() {
match slice_validity.execute_mask(slice_n_rows, ctx)?.indices() {
AllOr::All => {
// the decompressed buffer is a bunch of interleaved u32 lengths
// and strings of those lengths, we need to reconstruct the
Expand Down Expand Up @@ -937,6 +939,10 @@ impl ValiditySliceHelper for ZstdArray {

impl OperationsVTable<Zstd> for Zstd {
fn scalar_at(array: &ZstdArray, index: usize) -> VortexResult<Scalar> {
array._slice(index, index + 1).decompress()?.scalar_at(0)
let mut ctx = LEGACY_SESSION.create_execution_ctx();
array
._slice(index, index + 1)
.decompress(&mut ctx)?
.scalar_at(0)
}
}
10 changes: 5 additions & 5 deletions encodings/zstd/src/compute/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use vortex_array::IntoArray;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::scalar_fn::fns::cast::CastReduce;
use vortex_array::validity::Validity;
use vortex_error::VortexResult;

use crate::Zstd;
Expand Down Expand Up @@ -45,11 +46,10 @@ impl CastReduce for Zstd {
}
(Nullability::Nullable, Nullability::NonNullable) => {
// null => non-null works if there are no nulls in the sliced range
let sliced_len = array.slice_stop() - array.slice_start();
let has_nulls = !array
.unsliced_validity
.slice(array.slice_start()..array.slice_stop())?
.all_valid(sliced_len)?;
let has_nulls = !matches!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have to change the comment? This previously was specifically checking the sliced range

array.validity()?,
Validity::AllValid | Validity::NonNullable
);

// We don't attempt to handle casting when there are nulls.
if has_nulls {
Expand Down
Loading
Loading