Skip to content

Commit 4780af9

Browse files
authored
fix: fix handling of all-preamble chunks in mini-block scheduling (lancedb#4823)
1 parent 43e0e7e commit 4780af9

File tree

2 files changed

+76
-32
lines changed

2 files changed

+76
-32
lines changed

rust/lance-encoding/src/encodings/logical/primitive.rs

Lines changed: 54 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1284,11 +1284,40 @@ enum PreambleAction {
12841284
Absent,
12851285
}
12861286

1287-
// TODO: Add test cases for the all-preamble and all-trailer cases
1288-
12891287
// When we schedule a chunk we use the repetition index (or, if none exists, just the # of items
12901288
// in each chunk) to map a user requested range into a set of ChunkInstruction objects which tell
12911289
// us how exactly to read from the chunk.
1290+
//
1291+
// Examples:
1292+
//
1293+
// | Chunk 0 | Chunk 1 | Chunk 2 | Chunk 3 |
1294+
// | xxxxyyyyzzz | zzzzzzzzz | zzzzzzzzz | aaabbcc |
1295+
//
1296+
// Full read (0..6)
1297+
//
1298+
// Chunk 0: (several rows, ends with trailer)
1299+
// preamble: absent
1300+
// rows_to_skip: 0
1301+
// rows_to_take: 3 (x, y, z)
1302+
// take_trailer: true
1303+
//
1304+
// Chunk 1: (all preamble, ends with trailer)
1305+
// preamble: take
1306+
// rows_to_skip: 0
1307+
// rows_to_take: 0
1308+
// take_trailer: true
1309+
//
1310+
// Chunk 2: (all preamble, no trailer)
1311+
// preamble: take
1312+
// rows_to_skip: 0
1313+
// rows_to_take: 0
1314+
// take_trailer: false
1315+
//
1316+
// Chunk 3: (several rows, no trailer or preamble)
1317+
// preamble: absent
1318+
// rows_to_skip: 0
1319+
// rows_to_take: 3 (a, b, c)
1320+
// take_trailer: false
12921321
#[derive(Clone, Debug, PartialEq, Eq)]
12931322
struct ChunkInstructions {
12941323
// The index of the chunk to read
@@ -1303,13 +1332,13 @@ struct ChunkInstructions {
13031332
//
13041333
// If this is non-zero then premable must not be Take
13051334
rows_to_skip: u64,
1306-
// How many complete (non-preamble / non-trailer) rows to take
1335+
// How many rows to take. If a row splits across chunks then we will count the row in the first
1336+
// chunk that contains the row.
13071337
rows_to_take: u64,
13081338
// A "trailer" is when a chunk ends with a partial list. If there is no repetition index there is
13091339
// never a trailer.
13101340
//
1311-
// It's possible for a chunk to be entirely trailer. This would mean the chunk starts with the beginning
1312-
// of a list and that list is continued in the next chunk.
1341+
// A chunk that is all preamble may or may not have a trailer.
13131342
//
13141343
// If this is true then we want to include the trailer
13151344
take_trailer: bool,
@@ -1401,7 +1430,10 @@ impl ChunkInstructions {
14011430
preamble: PreambleAction::Take,
14021431
rows_to_skip: 0,
14031432
rows_to_take: 0,
1404-
take_trailer: false,
1433+
// We still need to look at has_trailer to distinguish between "all preamble
1434+
// and row ends at end of chunk" and "all preamble and row bleeds into next
1435+
// chunk". Both cases will have 0 rows available.
1436+
take_trailer: chunk.has_trailer,
14051437
});
14061438
// Only set need_preamble = false if the chunk has at least one row,
14071439
// Or we are reaching the last block,
@@ -1441,13 +1473,11 @@ impl ChunkInstructions {
14411473
} else {
14421474
PreambleAction::Absent
14431475
};
1444-
let mut rows_to_take_no_trailer = rows_to_take;
14451476

14461477
// Are we taking the trailer? If so, make sure we mark that we need the preamble
14471478
if rows_to_take == rows_avail && chunk.has_trailer {
14481479
take_trailer = true;
14491480
need_preamble = true;
1450-
rows_to_take_no_trailer -= 1;
14511481
} else {
14521482
need_preamble = false;
14531483
};
@@ -1456,7 +1486,7 @@ impl ChunkInstructions {
14561486
preamble,
14571487
chunk_idx: block_index,
14581488
rows_to_skip: to_skip,
1459-
rows_to_take: rows_to_take_no_trailer,
1489+
rows_to_take,
14601490
take_trailer,
14611491
});
14621492

@@ -1498,7 +1528,7 @@ impl ChunkInstructions {
14981528
) -> (ChunkDrainInstructions, bool) {
14991529
// If we need the premable then we shouldn't be skipping anything
15001530
debug_assert!(!*need_preamble || *skip_in_chunk == 0);
1501-
let mut rows_avail = self.rows_to_take - *skip_in_chunk;
1531+
let rows_avail = self.rows_to_take - *skip_in_chunk;
15021532
let has_preamble = self.preamble != PreambleAction::Absent;
15031533
let preamble_action = match (*need_preamble, has_preamble) {
15041534
(true, true) => PreambleAction::Take,
@@ -1507,16 +1537,16 @@ impl ChunkInstructions {
15071537
(false, false) => PreambleAction::Absent,
15081538
};
15091539

1510-
// Did the scheduled chunk have a trailer? If so, we have one extra row available
1511-
if self.take_trailer {
1512-
rows_avail += 1;
1513-
}
1514-
15151540
// How many rows are we actually taking in this take step (including the preamble
15161541
// and trailer both as individual rows)
15171542
let rows_taking = if *rows_desired >= rows_avail {
15181543
// We want all the rows. If there is a trailer we are grabbing it and will need
15191544
// the preamble of the next chunk
1545+
// If there is a trailer and we are taking all the rows then we need the preamble
1546+
// of the next chunk.
1547+
//
1548+
// Also, if this chunk is entirely preamble (rows_avail == 0 && !take_trailer) then we
1549+
// need the preamble of the next chunk.
15201550
*need_preamble = self.take_trailer;
15211551
rows_avail
15221552
} else {
@@ -1682,14 +1712,7 @@ impl StructuralPageScheduler for MiniBlockScheduler {
16821712
num_rows,
16831713
chunk_instructions
16841714
.iter()
1685-
.map(|ci| {
1686-
let taken = ci.rows_to_take;
1687-
if ci.take_trailer {
1688-
taken + 1
1689-
} else {
1690-
taken
1691-
}
1692-
})
1715+
.map(|ci| ci.rows_to_take)
16931716
.sum::<u64>()
16941717
);
16951718

@@ -1698,6 +1721,7 @@ impl StructuralPageScheduler for MiniBlockScheduler {
16981721
.map(|ci| ci.chunk_idx)
16991722
.unique()
17001723
.collect::<Vec<_>>();
1724+
17011725
let mut loaded_chunks = self.lookup_chunks(&chunks_needed);
17021726
let chunk_ranges = loaded_chunks
17031727
.iter()
@@ -3230,7 +3254,7 @@ const MINIBLOCK_ALIGNMENT: usize = 8;
32303254
/// If the data is wide then we zip together the repetition and definition value
32313255
/// with the value data into a single buffer. This approach is called "zipped".
32323256
///
3233-
/// If there is any repetition information then we create a repetition index (TODO)
3257+
/// If there is any repetition information then we create a repetition index
32343258
///
32353259
/// In addition, the compression process may create zero or more metadata buffers.
32363260
/// For example, a dictionary compression will create dictionary metadata. Any
@@ -3681,9 +3705,9 @@ impl PrimitiveStructuralEncoder {
36813705
let repdef = RepDefBuilder::serialize(repdefs);
36823706

36833707
if let DataBlock::AllNull(_null_block) = data {
3684-
// If we got here then all the data is null but we have rep/def information that
3685-
// we need to store.
3686-
todo!()
3708+
// We should not be using mini-block for all-null. There are other structural
3709+
// encodings for that.
3710+
unreachable!()
36873711
}
36883712

36893713
let num_items = data.num_values();
@@ -4753,7 +4777,7 @@ mod tests {
47534777
chunk_idx: 0,
47544778
preamble: PreambleAction::Absent,
47554779
rows_to_skip: 0,
4756-
rows_to_take: 5,
4780+
rows_to_take: 6,
47574781
take_trailer: true,
47584782
},
47594783
ChunkInstructions {
@@ -4767,7 +4791,7 @@ mod tests {
47674791
chunk_idx: 2,
47684792
preamble: PreambleAction::Absent,
47694793
rows_to_skip: 0,
4770-
rows_to_take: 4,
4794+
rows_to_take: 5,
47714795
take_trailer: true,
47724796
},
47734797
ChunkInstructions {
@@ -4834,7 +4858,7 @@ mod tests {
48344858
chunk_idx: 0,
48354859
preamble: PreambleAction::Absent,
48364860
rows_to_skip: 5,
4837-
rows_to_take: 0,
4861+
rows_to_take: 1,
48384862
take_trailer: true,
48394863
},
48404864
ChunkInstructions {

rust/lance-encoding/src/encodings/physical/value.rs

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -747,9 +747,10 @@ pub(crate) mod tests {
747747
};
748748

749749
use arrow_array::{
750-
make_array, Array, ArrayRef, Decimal128Array, FixedSizeListArray, Int32Array,
750+
make_array, Array, ArrayRef, Decimal128Array, FixedSizeListArray, Int32Array, ListArray,
751+
UInt8Array,
751752
};
752-
use arrow_buffer::{BooleanBuffer, NullBuffer};
753+
use arrow_buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
753754
use arrow_schema::{DataType, Field, TimeUnit};
754755
use lance_datagen::{array, gen_batch, ArrayGeneratorExt, Dimension, RowCount};
755756

@@ -1024,6 +1025,25 @@ pub(crate) mod tests {
10241025
assert_eq!(decompressed.as_ref(), &sample_list);
10251026
}
10261027

1028+
#[test_log::test(tokio::test)]
1029+
async fn regress_list_fsl() {
1030+
// This regresses a case where rows are large lists that span multiple
1031+
// mini-block chunks which gives us some all-premable mini-block chunks.
1032+
let offsets = ScalarBuffer::<i32>::from(vec![0, 393, 755, 1156, 1536]);
1033+
let data = UInt8Array::from(vec![0; 1536 * 16]);
1034+
let fsl_field = Arc::new(Field::new("item", DataType::UInt8, true));
1035+
let fsl = FixedSizeListArray::new(fsl_field, 16, Arc::new(data), None);
1036+
let list_field = Arc::new(Field::new("item", fsl.data_type().clone(), false));
1037+
let list_arr = ListArray::new(list_field, OffsetBuffer::new(offsets), Arc::new(fsl), None);
1038+
1039+
let test_cases = TestCases::default()
1040+
.with_min_file_version(LanceFileVersion::V2_1)
1041+
.with_batch_size(1);
1042+
1043+
check_round_trip_encoding_of_data(vec![Arc::new(list_arr)], &test_cases, HashMap::new())
1044+
.await;
1045+
}
1046+
10271047
fn create_random_fsl() -> Arc<dyn Array> {
10281048
// Several levels of def and multiple pages
10291049
let inner = array::rand_type(&DataType::Int32).with_random_nulls(0.1);

0 commit comments

Comments
 (0)