From 7a40dbbce2ddac99cc59a4893981b1721b2e9e40 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 17 Feb 2025 11:55:29 +0530 Subject: [PATCH 01/31] feat: replace `StreamReader`/`StreamWriter` with `FileReader`/`FileWriter` --- src/parseable/staging/reader.rs | 253 +++++--------------------------- src/parseable/staging/writer.rs | 8 +- src/parseable/streams.rs | 15 +- 3 files changed, 46 insertions(+), 230 deletions(-) diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 6df0dc324..8f7659c1c 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -19,16 +19,14 @@ use std::{ fs::{remove_file, File}, - io::{self, BufReader, Read, Seek, SeekFrom}, + io::BufReader, path::PathBuf, sync::Arc, - vec::IntoIter, }; use arrow_array::{RecordBatch, TimestampMillisecondArray}; -use arrow_ipc::{reader::StreamReader, root_as_message_unchecked, MessageHeader}; +use arrow_ipc::reader::FileReader; use arrow_schema::Schema; -use byteorder::{LittleEndian, ReadBytesExt}; use itertools::kmerge_by; use tracing::error; @@ -39,7 +37,7 @@ use crate::{ #[derive(Debug)] pub struct MergedRecordReader { - pub readers: Vec>>, + pub readers: Vec>>, } impl MergedRecordReader { @@ -53,7 +51,7 @@ impl MergedRecordReader { remove_file(file).unwrap(); } else { let Ok(reader) = - StreamReader::try_new(BufReader::new(File::open(file).unwrap()), None) + FileReader::try_new(BufReader::new(File::open(file).unwrap()), None) else { error!("Invalid file detected, ignoring it: {:?}", file); continue; @@ -74,27 +72,6 @@ impl MergedRecordReader { ) .unwrap() } -} - -#[derive(Debug)] -pub struct MergedReverseRecordReader { - pub readers: Vec>>>, -} - -impl MergedReverseRecordReader { - pub fn try_new(files: &[PathBuf]) -> Self { - let mut readers = Vec::with_capacity(files.len()); - for file in files { - let Ok(reader) = get_reverse_reader(File::open(file).unwrap()) else { - error!("Invalid file detected, ignoring it: {:?}", file); - continue; - }; - - readers.push(reader); - } - - Self { readers } - } pub fn merged_iter( self, @@ -111,15 +88,6 @@ impl MergedReverseRecordReader { .map(|batch| reverse(&batch)) .map(move |batch| adapt_batch(&schema, &batch)) } - - pub fn merged_schema(&self) -> Schema { - Schema::try_merge( - self.readers - .iter() - .map(|reader| reader.schema().as_ref().clone()), - ) - .unwrap() - } } fn get_timestamp_millis(batch: &RecordBatch, time_partition: Option) -> i64 { @@ -138,6 +106,7 @@ fn get_timestamp_millis(batch: &RecordBatch, time_partition: Option) -> None => get_default_timestamp_millis(batch), } } + fn get_default_timestamp_millis(batch: &RecordBatch) -> i64 { match batch .column(0) @@ -157,172 +126,18 @@ fn get_default_timestamp_millis(batch: &RecordBatch) -> i64 { } } -/// OffsetReader takes in a reader and list of offset and sizes and -/// provides a reader over the file by reading only the offsets -/// from start of the list to end. -/// -/// Safety Invariant: Reader is already validated and all offset and limit are valid to read. -/// -/// On empty list the reader returns no bytes read. -pub struct OffsetReader { - reader: R, - offset_list: IntoIter<(u64, usize)>, - current_offset: u64, - current_size: usize, - buffer: Vec, - buffer_position: usize, - finished: bool, -} - -impl OffsetReader { - fn new(reader: R, offset_list: Vec<(u64, usize)>) -> Self { - let mut offset_list = offset_list.into_iter(); - let mut finished = false; - - let (current_offset, current_size) = offset_list.next().unwrap_or_default(); - if current_offset == 0 && current_size == 0 { - finished = true - } - - OffsetReader { - reader, - offset_list, - current_offset, - current_size, - buffer: vec![0; 4096], - buffer_position: 0, - finished, - } - } -} - -impl Read for OffsetReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let offset = self.current_offset; - let size = self.current_size; - - if self.finished { - return Ok(0); - } - // on empty buffer load current data represented by - // current_offset and current_size into self buffer - if self.buffer_position == 0 { - self.reader.seek(SeekFrom::Start(offset))?; - // resize for current message - if self.buffer.len() < size { - self.buffer.resize(size, 0) - } - self.reader.read_exact(&mut self.buffer[0..size])?; - } - - let remaining_bytes = size - self.buffer_position; - let max_read = usize::min(remaining_bytes, buf.len()); - - // Copy data from the buffer to the provided buffer - let read_data = &self.buffer[self.buffer_position..self.buffer_position + max_read]; - buf[..max_read].copy_from_slice(read_data); - - self.buffer_position += max_read; - - if self.buffer_position >= size { - // If we've read the entire section, move to the next offset - match self.offset_list.next() { - Some((offset, size)) => { - self.current_offset = offset; - self.current_size = size; - self.buffer_position = 0; - } - None => { - // iter is exhausted, no more read can be done - self.finished = true - } - } - } - - Ok(max_read) - } -} - -pub fn get_reverse_reader( - mut reader: T, -) -> Result>>, io::Error> { - let mut offset = 0; - let mut messages = Vec::new(); - - while let Some(res) = find_limit_and_type(&mut reader).transpose() { - match res { - Ok((header, size)) => { - messages.push((header, offset, size)); - offset += size; - } - Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break, - Err(err) => return Err(err), - } - } - - // reverse everything leaving the first because it has schema message. - messages[1..].reverse(); - let messages = messages - .into_iter() - .map(|(_, offset, size)| (offset as u64, size)) - .collect(); - - // reset reader - reader.rewind()?; - - Ok(StreamReader::try_new(BufReader::new(OffsetReader::new(reader, messages)), None).unwrap()) -} - -// return limit for -fn find_limit_and_type( - reader: &mut (impl Read + Seek), -) -> Result, io::Error> { - let mut size = 0; - let marker = reader.read_u32::()?; - size += 4; - - if marker != 0xFFFFFFFF { - return Err(io::Error::new( - io::ErrorKind::InvalidData, - "Invalid Continuation Marker", - )); - } - - let metadata_size = reader.read_u32::()? as usize; - size += 4; - - if metadata_size == 0x00000000 { - return Ok(None); - } - - let mut message = vec![0u8; metadata_size]; - reader.read_exact(&mut message)?; - size += metadata_size; - - let message = unsafe { root_as_message_unchecked(&message) }; - let header = message.header_type(); - let message_size = message.bodyLength(); - size += message_size as usize; - - let padding = (8 - (size % 8)) % 8; - reader.seek(SeekFrom::Current(padding as i64 + message_size))?; - size += padding; - - Ok(Some((header, size))) -} - #[cfg(test)] mod tests { - use std::{io::Cursor, sync::Arc}; + use std::{fs::File, path::Path, sync::Arc}; use arrow_array::{ cast::AsArray, types::Int64Type, Array, Float64Array, Int64Array, RecordBatch, StringArray, }; - use arrow_ipc::writer::{ - write_message, DictionaryTracker, IpcDataGenerator, IpcWriteOptions, StreamWriter, + use arrow_ipc::{ + reader::FileReader, + writer::{write_message, DictionaryTracker, FileWriter, IpcDataGenerator, IpcWriteOptions}, }; - - use super::get_reverse_reader; + use temp_dir::TempDir; fn rb(rows: usize) -> RecordBatch { let array1: Arc = Arc::new(Int64Array::from_iter(0..(rows as i64))); @@ -339,42 +154,48 @@ mod tests { .unwrap() } - fn write_mem(rbs: &[RecordBatch]) -> Vec { - let buf = Vec::new(); - let mut writer = StreamWriter::try_new(buf, &rbs[0].schema()).unwrap(); + fn write_file(rbs: &[RecordBatch], path: &Path) { + let file = File::create(path).unwrap(); + let mut writer = FileWriter::try_new_buffered(file, &rbs[0].schema()).unwrap(); for rb in rbs { writer.write(rb).unwrap() } - writer.into_inner().unwrap() + writer.finish().unwrap(); } #[test] fn test_empty_row() { + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("test.arrows"); let rb = rb(0); - let buf = write_mem(&[rb]); - let reader = Cursor::new(buf); - let mut reader = get_reverse_reader(reader).unwrap(); + write_file(&[rb], &path); + let reader = File::open(path).unwrap(); + let mut reader = FileReader::try_new_buffered(reader, None).unwrap(); let rb = reader.next().unwrap().unwrap(); assert_eq!(rb.num_rows(), 0); } #[test] fn test_one_row() { + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("test.arrows"); let rb = rb(1); - let buf = write_mem(&[rb]); - let reader = Cursor::new(buf); - let mut reader = get_reverse_reader(reader).unwrap(); + write_file(&[rb], &path); + let reader = File::open(path).unwrap(); + let mut reader = FileReader::try_new_buffered(reader, None).unwrap(); let rb = reader.next().unwrap().unwrap(); assert_eq!(rb.num_rows(), 1); } #[test] fn test_multiple_row_multiple_rbs() { - let buf = write_mem(&[rb(1), rb(2), rb(3)]); - let reader = Cursor::new(buf); - let mut reader = get_reverse_reader(reader).unwrap(); + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("test.arrows"); + write_file(&[rb(1), rb(2), rb(3)], &path); + let reader = File::open(path).unwrap(); + let mut reader = FileReader::try_new_buffered(reader, None).unwrap(); let rb = reader.next().unwrap().unwrap(); assert_eq!(rb.num_rows(), 3); let col1_val: Vec = rb @@ -394,12 +215,13 @@ mod tests { #[test] fn manual_write() { + let temp_dir = TempDir::new().unwrap(); + let path = temp_dir.path().join("test.arrows"); let error_on_replacement = true; let options = IpcWriteOptions::default(); let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement); let data_gen = IpcDataGenerator {}; - - let mut buf = Vec::new(); + let mut file = File::create(&path).unwrap(); let rb1 = rb(1); let schema = data_gen.schema_to_bytes_with_dictionary_tracker( @@ -407,13 +229,13 @@ mod tests { &mut dictionary_tracker, &options, ); - write_message(&mut buf, schema, &options).unwrap(); + write_message(&mut file, schema, &options).unwrap(); for i in (1..=3).cycle().skip(1).take(10000) { let (_, encoded_message) = data_gen .encoded_batch(&rb(i), &mut dictionary_tracker, &options) .unwrap(); - write_message(&mut buf, encoded_message, &options).unwrap(); + write_message(&mut file, encoded_message, &options).unwrap(); } let schema = data_gen.schema_to_bytes_with_dictionary_tracker( @@ -421,13 +243,14 @@ mod tests { &mut dictionary_tracker, &options, ); - write_message(&mut buf, schema, &options).unwrap(); + write_message(&mut file, schema, &options).unwrap(); - let buf = Cursor::new(buf); - let reader = get_reverse_reader(buf).unwrap().flatten(); + let reader = File::open(path).unwrap(); + let reader = FileReader::try_new_buffered(reader, None).unwrap(); let mut sum = 0; for rb in reader { + let rb = rb.unwrap(); sum += 1; assert!(rb.num_rows() > 0); } diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index c43252f14..13100afdb 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -18,13 +18,11 @@ */ use std::{ - collections::{HashMap, HashSet}, - fs::File, - sync::Arc, + collections::{HashMap, HashSet}, fs::File, io::BufWriter, sync::Arc }; use arrow_array::RecordBatch; -use arrow_ipc::writer::StreamWriter; +use arrow_ipc::writer::FileWriter; use arrow_schema::Schema; use arrow_select::concat::concat_batches; use itertools::Itertools; @@ -34,7 +32,7 @@ use crate::utils::arrow::adapt_batch; #[derive(Default)] pub struct Writer { pub mem: MemWriter<16384>, - pub disk: HashMap>, + pub disk: HashMap>>, } /// Structure to keep recordbatches in memory. diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 7d8decdca..860ae83a4 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -27,7 +27,7 @@ use std::{ }; use arrow_array::RecordBatch; -use arrow_ipc::writer::StreamWriter; +use arrow_ipc::writer::FileWriter; use arrow_schema::{Field, Fields, Schema}; use chrono::{NaiveDateTime, Timelike, Utc}; use derive_more::{Deref, DerefMut}; @@ -57,11 +57,7 @@ use crate::{ }; use super::{ - staging::{ - reader::{MergedRecordReader, MergedReverseRecordReader}, - writer::Writer, - StagingError, - }, + staging::{reader::MergedRecordReader, writer::Writer, StagingError}, LogStream, }; @@ -132,7 +128,7 @@ impl Stream { .append(true) .open(&file_path)?; - let mut writer = StreamWriter::try_new(file, &record.schema()) + let mut writer = FileWriter::try_new_buffered(file, &record.schema()) .expect("File and RecordBatch both are checked"); writer.write(record)?; @@ -444,7 +440,7 @@ impl Stream { .set(0); } - // warn!("staging files-\n{staging_files:?}\n"); + warn!("staging files-\n{staging_files:?}\n"); for (parquet_path, arrow_files) in staging_files { metrics::STAGING_FILES .with_label_values(&[&self.stream_name]) @@ -459,7 +455,7 @@ impl Stream { .add(file_size as i64); } - let record_reader = MergedReverseRecordReader::try_new(&arrow_files); + let record_reader = MergedRecordReader::try_new(&arrow_files).unwrap(); if record_reader.readers.is_empty() { continue; } @@ -524,7 +520,6 @@ impl Stream { } let schema = record_reader.merged_schema(); - Schema::try_merge(vec![schema, current_schema]).unwrap() } From 4b3f3df5cbcee8a65aa188630e9e90d6e5b2bb6b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 17 Feb 2025 12:01:16 +0530 Subject: [PATCH 02/31] refactor: can't err --- src/parseable/staging/reader.rs | 4 ++-- src/parseable/staging/writer.rs | 5 ++++- src/parseable/streams.rs | 4 ++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 8f7659c1c..c9ad5e96a 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -41,7 +41,7 @@ pub struct MergedRecordReader { } impl MergedRecordReader { - pub fn try_new(files: &[PathBuf]) -> Result { + pub fn new(files: &[PathBuf]) -> Self { let mut readers = Vec::with_capacity(files.len()); for file in files { @@ -61,7 +61,7 @@ impl MergedRecordReader { } } - Ok(Self { readers }) + Self { readers } } pub fn merged_schema(&self) -> Schema { diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index 13100afdb..bcd789338 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -18,7 +18,10 @@ */ use std::{ - collections::{HashMap, HashSet}, fs::File, io::BufWriter, sync::Arc + collections::{HashMap, HashSet}, + fs::File, + io::BufWriter, + sync::Arc, }; use arrow_array::RecordBatch; diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 860ae83a4..f05e19040 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -455,7 +455,7 @@ impl Stream { .add(file_size as i64); } - let record_reader = MergedRecordReader::try_new(&arrow_files).unwrap(); + let record_reader = MergedRecordReader::new(&arrow_files); if record_reader.readers.is_empty() { continue; } @@ -514,7 +514,7 @@ impl Stream { pub fn updated_schema(&self, current_schema: Schema) -> Schema { let staging_files = self.arrow_files(); - let record_reader = MergedRecordReader::try_new(&staging_files).unwrap(); + let record_reader = MergedRecordReader::new(&staging_files); if record_reader.readers.is_empty() { return current_schema; } From e65d4e65ade54d6febfc8b415dc8dba192778a8f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 20 Feb 2025 14:46:02 +0530 Subject: [PATCH 03/31] fix: query logs from on-disk arrow files --- src/parseable/staging/mod.rs | 6 +++ src/parseable/staging/writer.rs | 53 +++++++++++++++++++- src/parseable/streams.rs | 78 ++++++++++++++++++----------- src/query/stream_schema_provider.rs | 2 +- 4 files changed, 108 insertions(+), 31 deletions(-) diff --git a/src/parseable/staging/mod.rs b/src/parseable/staging/mod.rs index 256133841..2c77df0af 100644 --- a/src/parseable/staging/mod.rs +++ b/src/parseable/staging/mod.rs @@ -20,6 +20,12 @@ pub mod reader; pub mod writer; +/// File extension for "finish"ed arrow files in staging +const ARROW_FILE_EXTENSION: &str = "data.arrows"; + +/// File extension for un"finish"ed arrow files in staging +const ARROW_PART_FILE_EXTENSION: &str = "part.arrows"; + #[derive(Debug, thiserror::Error)] pub enum StagingError { #[error("Unable to create recordbatch stream")] diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index bcd789338..4b84354d0 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -19,8 +19,9 @@ use std::{ collections::{HashMap, HashSet}, - fs::File, + fs::{File, OpenOptions}, io::BufWriter, + path::PathBuf, sync::Arc, }; @@ -32,10 +33,58 @@ use itertools::Itertools; use crate::utils::arrow::adapt_batch; +use super::{StagingError, ARROW_FILE_EXTENSION, ARROW_PART_FILE_EXTENSION}; + +/// Context regarding `.arrows` file being persisted onto disk +pub struct DiskWriter { + inner: FileWriter>, + // Used to ensure un"finish"ed arrow files are renamed on "finish" + path_prefix: PathBuf, +} + +impl DiskWriter { + pub fn new(path_prefix: PathBuf, schema: &Schema) -> Result { + // Live writes happen into partfile + let mut partfile_path = path_prefix.clone(); + partfile_path.set_extension(ARROW_PART_FILE_EXTENSION); + let file = OpenOptions::new() + .create(true) + .append(true) + .open(partfile_path)?; + + Ok(Self { + inner: FileWriter::try_new_buffered(file, schema) + .expect("File and RecordBatch both are checked"), + path_prefix, + }) + } + + /// Appends records into an `.arrows` file + pub fn write(&mut self, batch: &RecordBatch) -> Result<(), StagingError> { + self.inner.write(batch).map_err(StagingError::Arrow) + } + + /// Ensures `.arrows`` file in staging directory is "finish"ed and renames it from "part". + pub fn finish(mut self) -> Result<(), StagingError> { + self.inner.finish()?; + + let mut partfile_path = self.path_prefix.clone(); + partfile_path.set_extension(ARROW_PART_FILE_EXTENSION); + + let mut arrows_path = self.path_prefix; + arrows_path.set_extension(ARROW_FILE_EXTENSION); + + // Rename from part file to finished arrows file + std::fs::rename(partfile_path, arrows_path)?; + + Ok(()) + } +} + #[derive(Default)] pub struct Writer { pub mem: MemWriter<16384>, - pub disk: HashMap>>, + pub disk: HashMap, } /// Structure to keep recordbatches in memory. diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index f05e19040..fcd9075b1 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -27,7 +27,6 @@ use std::{ }; use arrow_array::RecordBatch; -use arrow_ipc::writer::FileWriter; use arrow_schema::{Field, Fields, Schema}; use chrono::{NaiveDateTime, Timelike, Utc}; use derive_more::{Deref, DerefMut}; @@ -41,7 +40,7 @@ use parquet::{ }; use rand::distributions::DistString; use relative_path::RelativePathBuf; -use tracing::{error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; use crate::{ cli::Options, @@ -57,7 +56,11 @@ use crate::{ }; use super::{ - staging::{reader::MergedRecordReader, writer::Writer, StagingError}, + staging::{ + reader::MergedRecordReader, + writer::{DiskWriter, Writer}, + StagingError, + }, LogStream, }; @@ -65,8 +68,6 @@ use super::{ #[error("Stream not found: {0}")] pub struct StreamNotFound(pub String); -const ARROW_FILE_EXTENSION: &str = "data.arrows"; - pub type StreamRef = Arc; /// All state associated with a single logstream in Parseable. @@ -116,22 +117,15 @@ impl Stream { } None => { // entry is not present thus we create it - let file_path = self.path_by_current_time( + let path_prefix = self.path_prefix_by_current_time( schema_key, parsed_timestamp, custom_partition_values, ); - std::fs::create_dir_all(&self.data_path)?; - - let file = OpenOptions::new() - .create(true) - .append(true) - .open(&file_path)?; - - let mut writer = FileWriter::try_new_buffered(file, &record.schema()) - .expect("File and RecordBatch both are checked"); + let mut writer = DiskWriter::new(path_prefix, &record.schema())?; writer.write(record)?; + guard.disk.insert(schema_key.to_owned(), writer); } }; @@ -142,7 +136,7 @@ impl Stream { Ok(()) } - pub fn path_by_current_time( + pub fn path_prefix_by_current_time( &self, stream_hash: &str, parsed_timestamp: NaiveDateTime, @@ -153,7 +147,7 @@ impl Stream { hostname.push_str(id); } let filename = format!( - "{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}", + "{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}", Utc::now().format("%Y%m%dT%H%M"), parsed_timestamp.date(), parsed_timestamp.hour(), @@ -345,8 +339,26 @@ impl Stream { Ok(()) } - pub fn recordbatches_cloned(&self, schema: &Arc) -> Vec { - self.writer.lock().unwrap().mem.recordbatch_cloned(schema) + /// Returns records batches as found in staging + pub fn recordbatches_cloned( + &self, + schema: &Arc, + time_partition: Option, + ) -> Vec { + // All records found in memory + let mut records = self.writer.lock().unwrap().mem.recordbatch_cloned(schema); + // Append records batches picked up from `.arrows` files + let arrow_files = self.arrow_files(); + let record_reader = MergedRecordReader::new(&arrow_files); + if record_reader.readers.is_empty() { + return vec![]; + } + let mut from_file = record_reader + .merged_iter(schema.clone(), time_partition) + .collect(); + records.append(&mut from_file); + + records } pub fn clear(&self) { @@ -354,7 +366,7 @@ impl Stream { } pub fn flush(&self) { - let mut disk_writers = { + let disk_writers = { let mut writer = self.writer.lock().unwrap(); // Flush memory writer.mem.clear(); @@ -363,8 +375,12 @@ impl Stream { }; // Flush disk - for writer in disk_writers.values_mut() { - _ = writer.finish(); + for (_, writer) in disk_writers { + if let Err(err) = writer.finish() { + warn!("Couldn't finish `.arrows` file: {err}"); + } else { + debug!("Finished `.arrows` file sync onto disk") + } } } @@ -855,7 +871,7 @@ mod tests { ); let expected_path = staging.data_path.join(format!( - "{}{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}", + "{}{stream_hash}.date={}.hour={:02}.minute={}.{}", Utc::now().format("%Y%m%dT%H%M"), parsed_timestamp.date(), parsed_timestamp.hour(), @@ -863,8 +879,11 @@ mod tests { hostname::get().unwrap().into_string().unwrap() )); - let generated_path = - staging.path_by_current_time(stream_hash, parsed_timestamp, &custom_partition_values); + let generated_path = staging.path_prefix_by_current_time( + stream_hash, + parsed_timestamp, + &custom_partition_values, + ); assert_eq!(generated_path, expected_path); } @@ -890,7 +909,7 @@ mod tests { ); let expected_path = staging.data_path.join(format!( - "{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}", + "{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}", Utc::now().format("%Y%m%dT%H%M"), parsed_timestamp.date(), parsed_timestamp.hour(), @@ -898,8 +917,11 @@ mod tests { hostname::get().unwrap().into_string().unwrap() )); - let generated_path = - staging.path_by_current_time(stream_hash, parsed_timestamp, &custom_partition_values); + let generated_path = staging.path_prefix_by_current_time( + stream_hash, + parsed_timestamp, + &custom_partition_values, + ); assert_eq!(generated_path, expected_path); } diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 8e3e62cfa..73cc47eff 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -233,7 +233,7 @@ impl StandardTableProvider { }; // Staging arrow exection plan - let records = staging.recordbatches_cloned(&self.schema); + let records = staging.recordbatches_cloned(&self.schema, staging.get_time_partition()); let arrow_exec = reversed_mem_table(records, self.schema.clone())? .scan(state, projection, filters, limit) .await?; From a829b9e7e734856a33cb0e1fb826df8024c4f245 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 20 Feb 2025 15:09:41 +0530 Subject: [PATCH 04/31] fix: `create_dir_all` --- src/parseable/streams.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index fcd9075b1..6335b9b72 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -122,6 +122,7 @@ impl Stream { parsed_timestamp, custom_partition_values, ); + std::fs::create_dir_all(&self.data_path)?; let mut writer = DiskWriter::new(path_prefix, &record.schema())?; writer.write(record)?; From 261fd660ebffb0389c5af1b8b7603bf22d9549c5 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 20 Feb 2025 21:50:34 +0530 Subject: [PATCH 05/31] fix: convert only finished arrow files --- src/parseable/mod.rs | 6 ++++++ src/parseable/staging/mod.rs | 6 ------ src/parseable/staging/writer.rs | 3 ++- src/parseable/streams.rs | 7 +++++-- 4 files changed, 13 insertions(+), 9 deletions(-) diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 10da8c1b0..872eaa751 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -58,6 +58,12 @@ use crate::{ mod staging; mod streams; +/// File extension for "finish"ed arrow files in staging +const ARROW_FILE_EXTENSION: &str = "data.arrows"; + +/// File extension for un"finish"ed arrow files in staging +const ARROW_PART_FILE_EXTENSION: &str = "part.arrows"; + /// Name of a Stream /// NOTE: this used to be a struct, flattened out for simplicity pub type LogStream = String; diff --git a/src/parseable/staging/mod.rs b/src/parseable/staging/mod.rs index 2c77df0af..256133841 100644 --- a/src/parseable/staging/mod.rs +++ b/src/parseable/staging/mod.rs @@ -20,12 +20,6 @@ pub mod reader; pub mod writer; -/// File extension for "finish"ed arrow files in staging -const ARROW_FILE_EXTENSION: &str = "data.arrows"; - -/// File extension for un"finish"ed arrow files in staging -const ARROW_PART_FILE_EXTENSION: &str = "part.arrows"; - #[derive(Debug, thiserror::Error)] pub enum StagingError { #[error("Unable to create recordbatch stream")] diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index 4b84354d0..7db1658eb 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -31,9 +31,10 @@ use arrow_schema::Schema; use arrow_select::concat::concat_batches; use itertools::Itertools; +use crate::parseable::{ARROW_FILE_EXTENSION, ARROW_PART_FILE_EXTENSION}; use crate::utils::arrow::adapt_batch; -use super::{StagingError, ARROW_FILE_EXTENSION, ARROW_PART_FILE_EXTENSION}; +use super::StagingError; /// Context regarding `.arrows` file being persisted onto disk pub struct DiskWriter { diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 6335b9b72..5e4e4c40d 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -61,7 +61,7 @@ use super::{ writer::{DiskWriter, Writer}, StagingError, }, - LogStream, + LogStream, ARROW_FILE_EXTENSION, }; #[derive(Debug, thiserror::Error)] @@ -170,7 +170,10 @@ impl Stream { let paths = dir .flatten() .map(|file| file.path()) - .filter(|file| file.extension().is_some_and(|ext| ext.eq("arrows"))) + .filter(|path| { + path.file_name() + .is_some_and(|f| f.to_string_lossy().ends_with(ARROW_FILE_EXTENSION)) + }) .sorted_by_key(|f| f.metadata().unwrap().modified().unwrap()) .collect(); From 01abc11d5869efbf73d3714928b9d254a17d12e1 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 20 Feb 2025 22:37:53 +0530 Subject: [PATCH 06/31] fix: limit to 1684 rows per arrow file --- src/parseable/staging/writer.rs | 54 ++++++++++++++++++++++++--------- src/parseable/streams.rs | 4 +-- 2 files changed, 42 insertions(+), 16 deletions(-) diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index 7db1658eb..b4def9145 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -37,17 +37,22 @@ use crate::utils::arrow::adapt_batch; use super::StagingError; /// Context regarding `.arrows` file being persisted onto disk -pub struct DiskWriter { +pub struct DiskWriter { inner: FileWriter>, - // Used to ensure un"finish"ed arrow files are renamed on "finish" + /// Used to ensure un"finish"ed arrow files are renamed on "finish" path_prefix: PathBuf, + /// Number of rows written onto disk + count: usize, + /// Denotes distinct files created with similar schema during the same minute by the same ingestor + file_id: usize, } -impl DiskWriter { +impl DiskWriter { pub fn new(path_prefix: PathBuf, schema: &Schema) -> Result { + let file_id = 0; // Live writes happen into partfile let mut partfile_path = path_prefix.clone(); - partfile_path.set_extension(ARROW_PART_FILE_EXTENSION); + partfile_path.set_extension(format!("{file_id}.{ARROW_PART_FILE_EXTENSION}")); let file = OpenOptions::new() .create(true) .append(true) @@ -57,35 +62,56 @@ impl DiskWriter { inner: FileWriter::try_new_buffered(file, schema) .expect("File and RecordBatch both are checked"), path_prefix, + count: 0, + file_id, }) } - /// Appends records into an `.arrows` file - pub fn write(&mut self, batch: &RecordBatch) -> Result<(), StagingError> { - self.inner.write(batch).map_err(StagingError::Arrow) + /// Appends records into an `{file_id}.part.arrows` files, + /// flushing onto disk and increments count on breaching row limit. + pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> { + if self.count + rb.num_rows() >= N { + let left = N - self.count; + let left_slice = rb.slice(0, left); + self.inner.write(&left_slice)?; + self.finish()?; + + // Write leftover records into new files until all have been written + if left < rb.num_rows() { + let right = rb.num_rows() - left; + self.write(&rb.slice(left, right))?; + } + } else { + self.inner.write(rb)?; + } + + Ok(()) } /// Ensures `.arrows`` file in staging directory is "finish"ed and renames it from "part". - pub fn finish(mut self) -> Result<(), StagingError> { + pub fn finish(&mut self) -> Result<(), StagingError> { self.inner.finish()?; let mut partfile_path = self.path_prefix.clone(); - partfile_path.set_extension(ARROW_PART_FILE_EXTENSION); + partfile_path.set_extension(format!("{}.{ARROW_PART_FILE_EXTENSION}", self.file_id)); - let mut arrows_path = self.path_prefix; - arrows_path.set_extension(ARROW_FILE_EXTENSION); + let mut arrows_path = self.path_prefix.clone(); + arrows_path.set_extension(format!("{}.{ARROW_FILE_EXTENSION}", self.file_id)); // Rename from part file to finished arrows file std::fs::rename(partfile_path, arrows_path)?; + self.file_id += 1; + self.count = 0; + Ok(()) } } #[derive(Default)] -pub struct Writer { - pub mem: MemWriter<16384>, - pub disk: HashMap, +pub struct Writer { + pub mem: MemWriter, + pub disk: HashMap>, } /// Structure to keep recordbatches in memory. diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 5e4e4c40d..42ba083bb 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -76,7 +76,7 @@ pub struct Stream { pub metadata: RwLock, pub data_path: PathBuf, pub options: Arc, - pub writer: Mutex, + pub writer: Mutex>, pub ingestor_id: Option, } @@ -379,7 +379,7 @@ impl Stream { }; // Flush disk - for (_, writer) in disk_writers { + for (_, mut writer) in disk_writers { if let Err(err) = writer.finish() { warn!("Couldn't finish `.arrows` file: {err}"); } else { From ceb0659e410e44fd322bc63ed1a3e8a609da237d Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 20 Feb 2025 22:51:43 +0530 Subject: [PATCH 07/31] test: no `manual_write` and fix read order --- src/parseable/staging/reader.rs | 70 +++++++++------------------------ 1 file changed, 18 insertions(+), 52 deletions(-) diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index c9ad5e96a..403828893 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -133,10 +133,7 @@ mod tests { use arrow_array::{ cast::AsArray, types::Int64Type, Array, Float64Array, Int64Array, RecordBatch, StringArray, }; - use arrow_ipc::{ - reader::FileReader, - writer::{write_message, DictionaryTracker, FileWriter, IpcDataGenerator, IpcWriteOptions}, - }; + use arrow_ipc::{reader::FileReader, writer::FileWriter}; use temp_dir::TempDir; fn rb(rows: usize) -> RecordBatch { @@ -197,64 +194,33 @@ mod tests { let reader = File::open(path).unwrap(); let mut reader = FileReader::try_new_buffered(reader, None).unwrap(); let rb = reader.next().unwrap().unwrap(); - assert_eq!(rb.num_rows(), 3); + assert_eq!(rb.num_rows(), 1); let col1_val: Vec = rb .column(0) .as_primitive::() .iter() .flatten() .collect(); - assert_eq!(col1_val, vec![0, 1, 2]); + assert_eq!(col1_val, vec![0]); let rb = reader.next().unwrap().unwrap(); assert_eq!(rb.num_rows(), 2); + let col1_val: Vec = rb + .column(0) + .as_primitive::() + .iter() + .flatten() + .collect(); + assert_eq!(col1_val, vec![0, 1]); let rb = reader.next().unwrap().unwrap(); - assert_eq!(rb.num_rows(), 1); - } - - #[test] - fn manual_write() { - let temp_dir = TempDir::new().unwrap(); - let path = temp_dir.path().join("test.arrows"); - let error_on_replacement = true; - let options = IpcWriteOptions::default(); - let mut dictionary_tracker = DictionaryTracker::new(error_on_replacement); - let data_gen = IpcDataGenerator {}; - let mut file = File::create(&path).unwrap(); - let rb1 = rb(1); - - let schema = data_gen.schema_to_bytes_with_dictionary_tracker( - &rb1.schema(), - &mut dictionary_tracker, - &options, - ); - write_message(&mut file, schema, &options).unwrap(); - - for i in (1..=3).cycle().skip(1).take(10000) { - let (_, encoded_message) = data_gen - .encoded_batch(&rb(i), &mut dictionary_tracker, &options) - .unwrap(); - write_message(&mut file, encoded_message, &options).unwrap(); - } - - let schema = data_gen.schema_to_bytes_with_dictionary_tracker( - &rb1.schema(), - &mut dictionary_tracker, - &options, - ); - write_message(&mut file, schema, &options).unwrap(); - - let reader = File::open(path).unwrap(); - let reader = FileReader::try_new_buffered(reader, None).unwrap(); - - let mut sum = 0; - for rb in reader { - let rb = rb.unwrap(); - sum += 1; - assert!(rb.num_rows() > 0); - } - - assert_eq!(sum, 10000); + assert_eq!(rb.num_rows(), 3); + let col1_val: Vec = rb + .column(0) + .as_primitive::() + .iter() + .flatten() + .collect(); + assert_eq!(col1_val, vec![0, 1, 2]); } } From 0e29387539c578e349f0d2d41303bfc8d77995d1 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 12:00:56 +0530 Subject: [PATCH 08/31] log: don't log "string files" --- src/parseable/streams.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 42ba083bb..f5165c3d2 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -460,7 +460,6 @@ impl Stream { .set(0); } - warn!("staging files-\n{staging_files:?}\n"); for (parquet_path, arrow_files) in staging_files { metrics::STAGING_FILES .with_label_values(&[&self.stream_name]) From 6fc59d59328169dd70bab100690edb78ffdd0904 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 16:15:06 +0530 Subject: [PATCH 09/31] fix: increment row count --- src/parseable/staging/writer.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index b4def9145..bd5a1ca60 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -83,6 +83,7 @@ impl DiskWriter { } } else { self.inner.write(rb)?; + self.count += rb.num_rows(); } Ok(()) From 240eb99d9a22c6749c6cc03ba2192e2e9542ea3a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 16:18:06 +0530 Subject: [PATCH 10/31] fix: don't drop in-memory records --- src/parseable/streams.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index f5165c3d2..a59c5430d 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -355,7 +355,7 @@ impl Stream { let arrow_files = self.arrow_files(); let record_reader = MergedRecordReader::new(&arrow_files); if record_reader.readers.is_empty() { - return vec![]; + return records; } let mut from_file = record_reader .merged_iter(schema.clone(), time_partition) From 76c7d58628b41ba15c96d806ce5db3b4224d4517 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 16:36:40 +0530 Subject: [PATCH 11/31] fix: `.arrows` naming convention --- src/parseable/staging/writer.rs | 21 +++++++++++---------- src/parseable/streams.rs | 3 ++- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index bd5a1ca60..a85f792f3 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -21,7 +21,6 @@ use std::{ collections::{HashMap, HashSet}, fs::{File, OpenOptions}, io::BufWriter, - path::PathBuf, sync::Arc, }; @@ -40,7 +39,7 @@ use super::StagingError; pub struct DiskWriter { inner: FileWriter>, /// Used to ensure un"finish"ed arrow files are renamed on "finish" - path_prefix: PathBuf, + path_prefix: String, /// Number of rows written onto disk count: usize, /// Denotes distinct files created with similar schema during the same minute by the same ingestor @@ -48,11 +47,10 @@ pub struct DiskWriter { } impl DiskWriter { - pub fn new(path_prefix: PathBuf, schema: &Schema) -> Result { + pub fn new(path_prefix: String, schema: &Schema) -> Result { let file_id = 0; // Live writes happen into partfile - let mut partfile_path = path_prefix.clone(); - partfile_path.set_extension(format!("{file_id}.{ARROW_PART_FILE_EXTENSION}")); + let partfile_path = format!("{path_prefix}.{file_id}.{ARROW_PART_FILE_EXTENSION}"); let file = OpenOptions::new() .create(true) .append(true) @@ -93,11 +91,14 @@ impl DiskWriter { pub fn finish(&mut self) -> Result<(), StagingError> { self.inner.finish()?; - let mut partfile_path = self.path_prefix.clone(); - partfile_path.set_extension(format!("{}.{ARROW_PART_FILE_EXTENSION}", self.file_id)); - - let mut arrows_path = self.path_prefix.clone(); - arrows_path.set_extension(format!("{}.{ARROW_FILE_EXTENSION}", self.file_id)); + let partfile_path = format!( + "{}.{}.{ARROW_PART_FILE_EXTENSION}", + self.path_prefix, self.file_id + ); + let arrows_path = format!( + "{}.{}.{ARROW_FILE_EXTENSION}", + self.path_prefix, self.file_id + ); // Rename from part file to finished arrows file std::fs::rename(partfile_path, arrows_path)?; diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index a59c5430d..40a4c3abb 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -124,7 +124,8 @@ impl Stream { ); std::fs::create_dir_all(&self.data_path)?; - let mut writer = DiskWriter::new(path_prefix, &record.schema())?; + let mut writer = + DiskWriter::new(path_prefix.display().to_string(), &record.schema())?; writer.write(record)?; guard.disk.insert(schema_key.to_owned(), writer); From acb5edecc6ce6e94688005eeab147c16dfddbaee Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 16:41:07 +0530 Subject: [PATCH 12/31] fix: actually replace with new file --- src/parseable/staging/writer.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index a85f792f3..112a3bea5 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -106,6 +106,18 @@ impl DiskWriter { self.file_id += 1; self.count = 0; + let partfile_path = format!( + "{}.{}.{ARROW_PART_FILE_EXTENSION}", + self.path_prefix, self.file_id + ); + let file = OpenOptions::new() + .create(true) + .append(true) + .open(partfile_path)?; + + self.inner = FileWriter::try_new_buffered(file, &self.inner.schema()) + .expect("File and RecordBatch both are checked"); + Ok(()) } } From 06524b6f6fdffdeee0ebb8cfc928227f7a616b69 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 16:44:01 +0530 Subject: [PATCH 13/31] ci: clippy suggestion --- src/parseable/staging/writer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index 112a3bea5..24068641c 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -115,7 +115,7 @@ impl DiskWriter { .append(true) .open(partfile_path)?; - self.inner = FileWriter::try_new_buffered(file, &self.inner.schema()) + self.inner = FileWriter::try_new_buffered(file, self.inner.schema()) .expect("File and RecordBatch both are checked"); Ok(()) From 224574ad6eb6c108eea614f6d337b945fddd5fc2 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 20:56:32 +0530 Subject: [PATCH 14/31] fix: properly group arrows into parquet --- src/parseable/streams.rs | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 40a4c3abb..f01655649 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -31,6 +31,7 @@ use arrow_schema::{Field, Fields, Schema}; use chrono::{NaiveDateTime, Timelike, Utc}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; +use once_cell::sync::Lazy; use parquet::{ arrow::ArrowWriter, basic::Encoding, @@ -39,6 +40,7 @@ use parquet::{ schema::types::ColumnPath, }; use rand::distributions::DistString; +use regex::Regex; use relative_path::RelativePathBuf; use tracing::{debug, error, info, trace, warn}; @@ -64,6 +66,10 @@ use super::{ LogStream, ARROW_FILE_EXTENSION, }; +static ARROWS_NAME_STRUCTURE: Lazy = Lazy::new(|| { + Regex::new(r"^[[:alnum:]]+\.(?P\S+)\.\d+\.data\.arrows$").expect("Validated regex") +}); + #[derive(Debug, thiserror::Error)] #[error("Stream not found: {0}")] pub struct StreamNotFound(pub String); @@ -281,10 +287,14 @@ impl Stream { } fn arrow_path_to_parquet(path: &Path, random_string: &str) -> PathBuf { - let filename = path.file_stem().unwrap().to_str().unwrap(); - let (_, filename) = filename.split_once('.').unwrap(); - assert!(filename.contains('.'), "contains the delim `.`"); - let filename_with_random_number = format!("{filename}.{random_string}.arrows"); + let filename = path.file_name().unwrap().to_str().unwrap(); + let filename = ARROWS_NAME_STRUCTURE + .captures(filename) + .unwrap() + .get(1) + .unwrap() + .as_str(); + let filename_with_random_number = format!("{filename}.data.{random_string}.arrows"); let mut parquet_path = path.to_owned(); parquet_path.set_file_name(filename_with_random_number); parquet_path.set_extension("parquet"); @@ -298,7 +308,7 @@ impl Stream { self.stream_name ); - let time_partition = self.get_time_partition(); + let time_partition: Option = self.get_time_partition(); let custom_partition = self.get_custom_partition(); // read arrow files on disk From 7b8dca146793688581aedce1bb0dfe76c92afd80 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 21:04:03 +0530 Subject: [PATCH 15/31] file type is known --- src/parseable/streams.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index f01655649..53f834246 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -478,10 +478,9 @@ impl Stream { for file in &arrow_files { let file_size = file.metadata().unwrap().len(); - let file_type = file.extension().unwrap().to_str().unwrap(); metrics::STORAGE_SIZE - .with_label_values(&["staging", &self.stream_name, file_type]) + .with_label_values(&["staging", &self.stream_name, ARROW_FILE_EXTENSION]) .add(file_size as i64); } @@ -523,13 +522,12 @@ impl Stream { for file in arrow_files { // warn!("file-\n{file:?}\n"); let file_size = file.metadata().unwrap().len(); - let file_type = file.extension().unwrap().to_str().unwrap(); if remove_file(file.clone()).is_err() { error!("Failed to delete file. Unstable state"); process::abort() } metrics::STORAGE_SIZE - .with_label_values(&["staging", &self.stream_name, file_type]) + .with_label_values(&["staging", &self.stream_name, ARROW_FILE_EXTENSION]) .sub(file_size as i64); } } From c9ff7a7bc40e5cbeb38b05ac11b816651e888afd Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 22:06:44 +0530 Subject: [PATCH 16/31] refactor: simplify in-mem threshold handling --- src/parseable/staging/writer.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index 24068641c..bf408e861 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -201,20 +201,15 @@ impl MutableBuffer { fn push(&mut self, rb: &RecordBatch) -> Option> { if self.inner.len() + rb.num_rows() >= N { let left = N - self.inner.len(); - let right = rb.num_rows() - left; let left_slice = rb.slice(0, left); - let right_slice = if left < rb.num_rows() { - Some(rb.slice(left, right)) - } else { - None - }; self.inner.push(left_slice); // take all records let src = Vec::with_capacity(self.inner.len()); let inner = std::mem::replace(&mut self.inner, src); - if let Some(right_slice) = right_slice { - self.inner.push(right_slice); + if left < rb.num_rows() { + let right = rb.num_rows() - left; + self.inner.push(rb.slice(left, right)); } Some(inner) From 8c2407de26afd7b5c0753a2cd9ccbdd0c497aee6 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 23:01:03 +0530 Subject: [PATCH 17/31] don't panic! --- src/parseable/streams.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 53f834246..edde28c80 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -325,8 +325,7 @@ impl Stream { // if yes, then merge them and save if let Some(mut schema) = schema { - let static_schema_flag = self.get_static_schema_flag(); - if !static_schema_flag { + if !self.get_static_schema_flag() { // schema is dynamic, read from staging and merge if present // need to add something before .schema to make the file have an extension of type `schema` @@ -477,7 +476,16 @@ impl Stream { .set(arrow_files.len() as i64); for file in &arrow_files { - let file_size = file.metadata().unwrap().len(); + let file_size = match file.metadata() { + Ok(meta) => meta.len(), + Err(err) => { + error!( + "Looks like the file ({}) was removed; Error = {err}", + file.display() + ); + continue; + } + }; metrics::STORAGE_SIZE .with_label_values(&["staging", &self.stream_name, ARROW_FILE_EXTENSION]) From 487655ef9924a144fd99b0d72b18824c344506ec Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 23:39:09 +0530 Subject: [PATCH 18/31] fix: panic on unexpected filename structure --- src/parseable/streams.rs | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index edde28c80..80274715a 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -70,6 +70,20 @@ static ARROWS_NAME_STRUCTURE: Lazy = Lazy::new(|| { Regex::new(r"^[[:alnum:]]+\.(?P\S+)\.\d+\.data\.arrows$").expect("Validated regex") }); +fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option { + let filename = path.file_name().unwrap().to_str().unwrap(); + let filename = ARROWS_NAME_STRUCTURE + .captures(filename) + .and_then(|c| c.get(1))? + .as_str(); + let filename_with_random_number = format!("{filename}.data.{random_string}.arrows"); + let mut parquet_path = path.to_owned(); + parquet_path.set_file_name(filename_with_random_number); + parquet_path.set_extension("parquet"); + + Some(parquet_path) +} + #[derive(Debug, thiserror::Error)] #[error("Stream not found: {0}")] pub struct StreamNotFound(pub String); @@ -222,12 +236,13 @@ impl Stream { &arrow_file_path, self.stream_name ); remove_file(&arrow_file_path).unwrap(); - } else { - let key = Self::arrow_path_to_parquet(&arrow_file_path, &random_string); + } else if let Some(key) = arrow_path_to_parquet(&arrow_file_path, &random_string) { grouped_arrow_file .entry(key) .or_default() .push(arrow_file_path); + } else { + warn!("Unexpected arrows file: {}", arrow_file_path.display()); } } grouped_arrow_file @@ -286,21 +301,6 @@ impl Stream { } } - fn arrow_path_to_parquet(path: &Path, random_string: &str) -> PathBuf { - let filename = path.file_name().unwrap().to_str().unwrap(); - let filename = ARROWS_NAME_STRUCTURE - .captures(filename) - .unwrap() - .get(1) - .unwrap() - .as_str(); - let filename_with_random_number = format!("{filename}.data.{random_string}.arrows"); - let mut parquet_path = path.to_owned(); - parquet_path.set_file_name(filename_with_random_number); - parquet_path.set_extension("parquet"); - parquet_path - } - /// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal` pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> { info!( From e957173102f8a7f88d1e90797549c40295f80896 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 22 Feb 2025 00:25:07 +0530 Subject: [PATCH 19/31] fix: consider custom partitions --- src/parseable/streams.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 80274715a..7ade7db03 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -67,7 +67,7 @@ use super::{ }; static ARROWS_NAME_STRUCTURE: Lazy = Lazy::new(|| { - Regex::new(r"^[[:alnum:]]+\.(?P\S+)\.\d+\.data\.arrows$").expect("Validated regex") + Regex::new(r"^[a-zA-Z0-9&=]+\.(?P\S+)\.\d+\.data\.arrows$").expect("Validated regex") }); fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option { From 76b8d2cd47e80bf2c741683563aed5ff875db801 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 22 Feb 2025 00:26:41 +0530 Subject: [PATCH 20/31] doc: custom parition limits --- src/parseable/streams.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 7ade7db03..e3deba9db 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -66,6 +66,7 @@ use super::{ LogStream, ARROW_FILE_EXTENSION, }; +// NOTE: this requires that custom partition values should not have special characters in their name/value static ARROWS_NAME_STRUCTURE: Lazy = Lazy::new(|| { Regex::new(r"^[a-zA-Z0-9&=]+\.(?P\S+)\.\d+\.data\.arrows$").expect("Validated regex") }); From 1a75118a77cd8f43adf91b99189d27f7c47fd85b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 22 Feb 2025 00:39:26 +0530 Subject: [PATCH 21/31] doc: improve explainers --- src/parseable/streams.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index e3deba9db..eea568b26 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -66,7 +66,24 @@ use super::{ LogStream, ARROW_FILE_EXTENSION, }; -// NOTE: this requires that custom partition values should not have special characters in their name/value +/// Regex pattern for parsing arrow file names. +/// +/// # Format +/// The expected format is: `...data.arrows` +/// where: +/// - schema_key: A key that is associated with the timestamp at ingestion, hash of arrow schema and the key-value +/// pairs joined by '&' and '=' (e.g., "20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2") +/// - front_part: Captured for parquet file naming, contains the timestamp associted with current/time-partition +/// as well as the custom partitioning key=value pairs (e.g., "date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76") +/// - file_id: Numeric id for individual arrows files +/// +/// # Limitations +/// - Partition keys and values must only contain alphanumeric characters +/// - Special characters in partition values will cause the pattern to fail in capturing +/// +/// # Examples +/// Valid: "key1=value1,key2=value2" +/// Invalid: "key1=special!value,key2=special#value" static ARROWS_NAME_STRUCTURE: Lazy = Lazy::new(|| { Regex::new(r"^[a-zA-Z0-9&=]+\.(?P\S+)\.\d+\.data\.arrows$").expect("Validated regex") }); @@ -97,6 +114,7 @@ pub struct Stream { pub metadata: RwLock, pub data_path: PathBuf, pub options: Arc, + /// Writer with a 16KB buffer size for optimal I/O performance. pub writer: Mutex>, pub ingestor_id: Option, } From 9c100410073fb22db539cb0c118eb884cb282dc2 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 22 Feb 2025 01:08:13 +0530 Subject: [PATCH 22/31] doc: `arrow_path_to_parquet` --- src/parseable/streams.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index eea568b26..af9aad489 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -67,7 +67,7 @@ use super::{ }; /// Regex pattern for parsing arrow file names. -/// +/// /// # Format /// The expected format is: `...data.arrows` /// where: @@ -76,11 +76,11 @@ use super::{ /// - front_part: Captured for parquet file naming, contains the timestamp associted with current/time-partition /// as well as the custom partitioning key=value pairs (e.g., "date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76") /// - file_id: Numeric id for individual arrows files -/// +/// /// # Limitations /// - Partition keys and values must only contain alphanumeric characters /// - Special characters in partition values will cause the pattern to fail in capturing -/// +/// /// # Examples /// Valid: "key1=value1,key2=value2" /// Invalid: "key1=special!value,key2=special#value" @@ -88,16 +88,16 @@ static ARROWS_NAME_STRUCTURE: Lazy = Lazy::new(|| { Regex::new(r"^[a-zA-Z0-9&=]+\.(?P\S+)\.\d+\.data\.arrows$").expect("Validated regex") }); +/// Returns the filename for parquet if provided arrows file path is valid as per our expectation fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option { let filename = path.file_name().unwrap().to_str().unwrap(); let filename = ARROWS_NAME_STRUCTURE .captures(filename) .and_then(|c| c.get(1))? .as_str(); - let filename_with_random_number = format!("{filename}.data.{random_string}.arrows"); + let filename_with_random_number = format!("{filename}.data.{random_string}.parquet"); let mut parquet_path = path.to_owned(); parquet_path.set_file_name(filename_with_random_number); - parquet_path.set_extension("parquet"); Some(parquet_path) } From 2bb93ec7c709e7bb928646dc087f72a5b45f5f09 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 22 Feb 2025 15:45:14 +0530 Subject: [PATCH 23/31] revert: all writes into a single file --- src/parseable/staging/writer.rs | 57 +++++---------------------------- src/parseable/streams.rs | 5 ++- 2 files changed, 10 insertions(+), 52 deletions(-) diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index bf408e861..ce1312249 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -29,6 +29,7 @@ use arrow_ipc::writer::FileWriter; use arrow_schema::Schema; use arrow_select::concat::concat_batches; use itertools::Itertools; +use tracing::trace; use crate::parseable::{ARROW_FILE_EXTENSION, ARROW_PART_FILE_EXTENSION}; use crate::utils::arrow::adapt_batch; @@ -40,17 +41,12 @@ pub struct DiskWriter { inner: FileWriter>, /// Used to ensure un"finish"ed arrow files are renamed on "finish" path_prefix: String, - /// Number of rows written onto disk - count: usize, - /// Denotes distinct files created with similar schema during the same minute by the same ingestor - file_id: usize, } impl DiskWriter { pub fn new(path_prefix: String, schema: &Schema) -> Result { - let file_id = 0; // Live writes happen into partfile - let partfile_path = format!("{path_prefix}.{file_id}.{ARROW_PART_FILE_EXTENSION}"); + let partfile_path = format!("{path_prefix}.{ARROW_PART_FILE_EXTENSION}"); let file = OpenOptions::new() .create(true) .append(true) @@ -60,29 +56,12 @@ impl DiskWriter { inner: FileWriter::try_new_buffered(file, schema) .expect("File and RecordBatch both are checked"), path_prefix, - count: 0, - file_id, }) } - /// Appends records into an `{file_id}.part.arrows` files, - /// flushing onto disk and increments count on breaching row limit. + /// Appends records into a `.part.arrows` file pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> { - if self.count + rb.num_rows() >= N { - let left = N - self.count; - let left_slice = rb.slice(0, left); - self.inner.write(&left_slice)?; - self.finish()?; - - // Write leftover records into new files until all have been written - if left < rb.num_rows() { - let right = rb.num_rows() - left; - self.write(&rb.slice(left, right))?; - } - } else { - self.inner.write(rb)?; - self.count += rb.num_rows(); - } + self.inner.write(rb)?; Ok(()) } @@ -91,32 +70,12 @@ impl DiskWriter { pub fn finish(&mut self) -> Result<(), StagingError> { self.inner.finish()?; - let partfile_path = format!( - "{}.{}.{ARROW_PART_FILE_EXTENSION}", - self.path_prefix, self.file_id - ); - let arrows_path = format!( - "{}.{}.{ARROW_FILE_EXTENSION}", - self.path_prefix, self.file_id - ); + let partfile_path = format!("{}.{ARROW_PART_FILE_EXTENSION}", self.path_prefix); + let arrows_path = format!("{}.{ARROW_FILE_EXTENSION}", self.path_prefix); // Rename from part file to finished arrows file - std::fs::rename(partfile_path, arrows_path)?; - - self.file_id += 1; - self.count = 0; - - let partfile_path = format!( - "{}.{}.{ARROW_PART_FILE_EXTENSION}", - self.path_prefix, self.file_id - ); - let file = OpenOptions::new() - .create(true) - .append(true) - .open(partfile_path)?; - - self.inner = FileWriter::try_new_buffered(file, self.inner.schema()) - .expect("File and RecordBatch both are checked"); + std::fs::rename(partfile_path, &arrows_path)?; + trace!("Finished arrows file: {arrows_path}"); Ok(()) } diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index af9aad489..47fdeb7bb 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -69,13 +69,12 @@ use super::{ /// Regex pattern for parsing arrow file names. /// /// # Format -/// The expected format is: `...data.arrows` +/// The expected format is: `..data.arrows` /// where: /// - schema_key: A key that is associated with the timestamp at ingestion, hash of arrow schema and the key-value /// pairs joined by '&' and '=' (e.g., "20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2") /// - front_part: Captured for parquet file naming, contains the timestamp associted with current/time-partition /// as well as the custom partitioning key=value pairs (e.g., "date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76") -/// - file_id: Numeric id for individual arrows files /// /// # Limitations /// - Partition keys and values must only contain alphanumeric characters @@ -85,7 +84,7 @@ use super::{ /// Valid: "key1=value1,key2=value2" /// Invalid: "key1=special!value,key2=special#value" static ARROWS_NAME_STRUCTURE: Lazy = Lazy::new(|| { - Regex::new(r"^[a-zA-Z0-9&=]+\.(?P\S+)\.\d+\.data\.arrows$").expect("Validated regex") + Regex::new(r"^[a-zA-Z0-9&=]+\.(?P\S+)\.data\.arrows$").expect("Validated regex") }); /// Returns the filename for parquet if provided arrows file path is valid as per our expectation From 913979a3d4fb5a7faa1cb4bf85e63d41697f5294 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 22 Feb 2025 16:01:47 +0530 Subject: [PATCH 24/31] limit record_batch row count --- src/parseable/staging/mod.rs | 4 ++-- src/parseable/staging/writer.rs | 16 ++++++++-------- src/parseable/streams.rs | 12 ++++++++++-- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/parseable/staging/mod.rs b/src/parseable/staging/mod.rs index 256133841..ced53ce36 100644 --- a/src/parseable/staging/mod.rs +++ b/src/parseable/staging/mod.rs @@ -30,6 +30,6 @@ pub enum StagingError { ObjectStorage(#[from] std::io::Error), #[error("Could not generate parquet file")] Create, - // #[error("Metadata Error: {0}")] - // Metadata(#[from] MetadataError), + #[error("Too many rows: {0}")] + RowLimit(usize), } diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index ce1312249..2c580a4a8 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -37,13 +37,13 @@ use crate::utils::arrow::adapt_batch; use super::StagingError; /// Context regarding `.arrows` file being persisted onto disk -pub struct DiskWriter { +pub struct DiskWriter { inner: FileWriter>, /// Used to ensure un"finish"ed arrow files are renamed on "finish" path_prefix: String, } -impl DiskWriter { +impl DiskWriter { pub fn new(path_prefix: String, schema: &Schema) -> Result { // Live writes happen into partfile let partfile_path = format!("{path_prefix}.{ARROW_PART_FILE_EXTENSION}"); @@ -81,12 +81,6 @@ impl DiskWriter { } } -#[derive(Default)] -pub struct Writer { - pub mem: MemWriter, - pub disk: HashMap>, -} - /// Structure to keep recordbatches in memory. /// /// Any new schema is updated in the schema map. @@ -178,3 +172,9 @@ impl MutableBuffer { } } } + +#[derive(Default)] +pub struct Writer { + pub mem: MemWriter, + pub disk: HashMap, +} diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 47fdeb7bb..d19abd978 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -66,6 +66,9 @@ use super::{ LogStream, ARROW_FILE_EXTENSION, }; +// ~16K rows is default in-memory limit for each recordbatch +const MAX_RECORD_BATCH_SIZE: usize = 16384; + /// Regex pattern for parsing arrow file names. /// /// # Format @@ -113,8 +116,8 @@ pub struct Stream { pub metadata: RwLock, pub data_path: PathBuf, pub options: Arc, - /// Writer with a 16KB buffer size for optimal I/O performance. - pub writer: Mutex>, + /// Writer with a ~16K rows limit for optimal I/O performance. + pub writer: Mutex>, pub ingestor_id: Option, } @@ -147,6 +150,11 @@ impl Stream { custom_partition_values: &HashMap, stream_type: StreamType, ) -> Result<(), StagingError> { + let row_count = record.num_rows(); + if row_count > MAX_RECORD_BATCH_SIZE { + return Err(StagingError::RowLimit(row_count)); + } + let mut guard = self.writer.lock().unwrap(); if self.options.mode != Mode::Query || stream_type == StreamType::Internal { match guard.disk.get_mut(schema_key) { From 78a141e6a330937246dc9e74b7b03c46e6da0381 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 28 Feb 2025 14:41:50 +0530 Subject: [PATCH 25/31] fix: don't count `part.arrows` --- src/parseable/streams.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 4725d540d..caf531b99 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -225,8 +225,10 @@ impl Stream { .flatten() .map(|file| file.path()) .filter(|path| { - path.file_name() - .is_some_and(|f| f.to_string_lossy().ends_with(ARROW_FILE_EXTENSION)) + path.file_name().is_some_and(|f| { + let filename = f.to_string_lossy(); + filename.ends_with(ARROW_FILE_EXTENSION) && !filename.contains("part") + }) }) .sorted_by_key(|f| f.metadata().unwrap().modified().unwrap()) .collect(); From b3037ffdbdd5123f726591d0629e60789eaa4a3e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 28 Feb 2025 14:49:01 +0530 Subject: [PATCH 26/31] match with regex --- src/parseable/streams.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index caf531b99..d53f1c37a 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -225,10 +225,10 @@ impl Stream { .flatten() .map(|file| file.path()) .filter(|path| { - path.file_name().is_some_and(|f| { - let filename = f.to_string_lossy(); - filename.ends_with(ARROW_FILE_EXTENSION) && !filename.contains("part") - }) + let Some(file_name) = path.file_name().and_then(|f| f.to_str()) else { + return false; + }; + ARROWS_NAME_STRUCTURE.is_match(file_name) }) .sorted_by_key(|f| f.metadata().unwrap().modified().unwrap()) .collect(); From 3cd83efe17cd615da175be7623487be730121a2d Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 14 Mar 2025 21:49:41 +0530 Subject: [PATCH 27/31] fix: reverse ordering of rbs --- src/parseable/staging/reader.rs | 62 ++++++++++++++++++++++++--------- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 32ec8f0d0..99818c763 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -20,13 +20,13 @@ use std::{ fs::{remove_file, File}, io::BufReader, - path::PathBuf, + path::{Path, PathBuf}, sync::Arc, }; use arrow_array::{RecordBatch, TimestampMillisecondArray}; use arrow_ipc::reader::FileReader; -use arrow_schema::Schema; +use arrow_schema::{ArrowError, Schema, SchemaRef}; use itertools::kmerge_by; use tracing::error; @@ -35,28 +35,58 @@ use crate::{ utils::arrow::{adapt_batch, reverse}, }; +#[derive(Debug)] +pub struct ReverseReader { + inner: FileReader>, + idx: usize, +} + +impl ReverseReader { + fn try_new(path: impl AsRef) -> Result { + let inner = FileReader::try_new(BufReader::new(File::open(path).unwrap()), None)?; + let idx = inner.num_batches(); + + Ok(Self { inner, idx }) + } + + fn schema(&self) -> SchemaRef { + self.inner.schema() + } +} + +impl Iterator for ReverseReader { + type Item = Result; + + fn next(&mut self) -> Option { + if self.idx <= 0 { + return None; + } + + self.idx -= 1; + if let Err(e) = self.inner.set_index(self.idx) { + return Some(Err(e)); + } + + self.inner.next() + } +} + #[derive(Debug)] pub struct MergedRecordReader { - pub readers: Vec>>, + pub readers: Vec, } impl MergedRecordReader { - pub fn new(files: &[PathBuf]) -> Self { - let mut readers = Vec::with_capacity(files.len()); + pub fn new(paths: &[PathBuf]) -> Self { + let mut readers = Vec::with_capacity(paths.len()); - for file in files { + for path in paths { //remove empty files before reading - if file.metadata().unwrap().len() == 0 { - error!("Invalid file detected, removing it: {:?}", file); - remove_file(file).unwrap(); + if path.metadata().unwrap().len() == 0 { + error!("Invalid file detected, removing it: {path:?}"); + remove_file(path).unwrap(); } else { - let Ok(reader) = - FileReader::try_new(BufReader::new(File::open(file).unwrap()), None) - else { - error!("Invalid file detected, ignoring it: {:?}", file); - continue; - }; - + let reader = ReverseReader::try_new(path).unwrap(); readers.push(reader); } } From c48499e25bff89a42779bcfc64db0ee4e9de4d12 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 14 Mar 2025 21:50:57 +0530 Subject: [PATCH 28/31] clippy suggestion --- src/parseable/staging/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index 99818c763..d498e320a 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -58,7 +58,7 @@ impl Iterator for ReverseReader { type Item = Result; fn next(&mut self) -> Option { - if self.idx <= 0 { + if self.idx == 0 { return None; } From d0159f1f39b7d73dcafa2e3904721a4165e48c1e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 15 Mar 2025 01:13:52 +0530 Subject: [PATCH 29/31] style: coderabbit suggestion --- src/parseable/staging/writer.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index 33a76ab93..26ca9f21a 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -53,8 +53,7 @@ impl DiskWriter { .open(partfile_path)?; Ok(Self { - inner: FileWriter::try_new_buffered(file, schema) - .expect("File and RecordBatch both are checked"), + inner: FileWriter::try_new_buffered(file, schema)?, path_prefix, }) } From ec31cfbbabef8453dc394418d64db551e9dc4843 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 16 Mar 2025 22:38:50 +0530 Subject: [PATCH 30/31] doc+test: improvements --- src/parseable/staging/reader.rs | 172 +++++++++++++++++++++++++++++++- 1 file changed, 167 insertions(+), 5 deletions(-) diff --git a/src/parseable/staging/reader.rs b/src/parseable/staging/reader.rs index efa5552c3..6def41af0 100644 --- a/src/parseable/staging/reader.rs +++ b/src/parseable/staging/reader.rs @@ -35,21 +35,29 @@ use crate::{ utils::arrow::{adapt_batch, reverse}, }; +/// `ReverseReader` provides an iterator over record batches in an Arrow IPC file format +/// in reverse order (from the last batch to the first). +/// +/// This is useful for scenarios where you need to process the most recent data first, +/// or when implementing time-series data exploration that starts with the latest records. #[derive(Debug)] pub struct ReverseReader { inner: FileReader>, + /// Current index for iteration (starts from the last batch) idx: usize, } impl ReverseReader { - fn try_new(path: impl AsRef) -> Result { + /// Creates a new `ReverseReader` from given path. + pub fn try_new(path: impl AsRef) -> Result { let inner = FileReader::try_new(BufReader::new(File::open(path).unwrap()), None)?; let idx = inner.num_batches(); Ok(Self { inner, idx }) } - fn schema(&self) -> SchemaRef { + /// Returns the schema of the underlying Arrow file. + pub fn schema(&self) -> SchemaRef { self.inner.schema() } } @@ -57,6 +65,9 @@ impl ReverseReader { impl Iterator for ReverseReader { type Item = Result; + /// Returns the next record batch in reverse order(latest to the first) from arrows file. + /// + /// Returns `None` when all batches have been processed. fn next(&mut self) -> Option { if self.idx == 0 { return None; @@ -158,19 +169,27 @@ fn get_default_timestamp_millis(batch: &RecordBatch) -> i64 { #[cfg(test)] mod tests { - use std::{fs::File, io, path::Path, sync::Arc}; + use std::{ + fs::File, + io::{self, Write}, + path::{Path, PathBuf}, + sync::Arc, + }; use arrow_array::{ cast::AsArray, types::Int64Type, Array, Float64Array, Int32Array, Int64Array, RecordBatch, StringArray, }; use arrow_ipc::{reader::FileReader, writer::FileWriter}; - use arrow_schema::{DataType, Field, Schema}; + use arrow_schema::{ArrowError, DataType, Field, Schema}; use chrono::Utc; use temp_dir::TempDir; use crate::{ - parseable::staging::{reader::MergedRecordReader, writer::DiskWriter}, + parseable::staging::{ + reader::{MergedRecordReader, ReverseReader}, + writer::DiskWriter, + }, utils::time::TimeRange, OBJECT_STORE_DATA_GRANULARITY, }; @@ -403,4 +422,147 @@ mod tests { Ok(()) } + + fn create_test_arrow_file(path: &PathBuf, num_batches: usize) -> Result<(), ArrowError> { + // Create schema + let schema = Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + ]); + let schema_ref = std::sync::Arc::new(schema); + + // Create file and writer + let file = File::create(path)?; + let mut writer = FileWriter::try_new(file, &schema_ref)?; + + // Create and write batches + for i in 0..num_batches { + let id_array = + Int32Array::from(vec![i as i32 * 10, i as i32 * 10 + 1, i as i32 * 10 + 2]); + let name_array = StringArray::from(vec![ + format!("batch_{i}_name_0"), + format!("batch_{i}_name_1"), + format!("batch_{i}_name_2"), + ]); + + let batch = RecordBatch::try_new( + schema_ref.clone(), + vec![ + std::sync::Arc::new(id_array), + std::sync::Arc::new(name_array), + ], + )?; + + writer.write(&batch)?; + } + + writer.finish()?; + Ok(()) + } + + #[test] + fn test_reverse_reader_creation() { + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("test.arrow"); + + // Create test file with 3 batches + create_test_arrow_file(&file_path, 3).unwrap(); + + // Test successful creation + let reader = ReverseReader::try_new(&file_path); + assert!(reader.is_ok()); + + // Test schema retrieval + let reader = reader.unwrap(); + let schema = reader.schema(); + assert_eq!(schema.fields().len(), 2); + assert_eq!(schema.field(0).name(), "id"); + assert_eq!(schema.field(1).name(), "name"); + } + + #[test] + fn test_reverse_reader_iteration() { + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("test.arrow"); + + // Create test file with 3 batches + create_test_arrow_file(&file_path, 3).unwrap(); + + // Create reader and iterate + let reader = ReverseReader::try_new(&file_path).unwrap(); + let batches: Vec<_> = reader.collect::, _>>().unwrap(); + + // Verify correct number of batches + assert_eq!(batches.len(), 3); + + // Verify reverse order + // Batch 2 (last written, first read) + let batch0 = &batches[0]; + assert_eq!(batch0.num_columns(), 2); + let id_array = batch0 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_array.value(0), 20); + + // Batch 1 (middle) + let batch1 = &batches[1]; + let id_array = batch1 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_array.value(0), 10); + + // Batch 0 (first written, last read) + let batch2 = &batches[2]; + let id_array = batch2 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_array.value(0), 0); + } + + #[test] + fn test_empty_file() { + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("empty.arrow"); + + // Create empty file with schema but no batches + create_test_arrow_file(&file_path, 0).unwrap(); + + let reader = ReverseReader::try_new(&file_path).unwrap(); + let batches: Vec<_> = reader.collect::, _>>().unwrap(); + + // Should be empty + assert_eq!(batches.len(), 0); + } + + #[test] + fn test_invalid_file() { + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("invalid.txt"); + + // Create a non-Arrow file + let mut file = File::create(&file_path).unwrap(); + writeln!(&mut file, "This is not an Arrow file").unwrap(); + + // Attempting to create a reader should fail + let reader = ReverseReader::try_new(&file_path); + assert!(reader.is_err()); + } + + #[test] + fn test_num_batches() { + let temp_dir = TempDir::new().unwrap(); + let file_path = temp_dir.path().join("test.arrow"); + + // Create test file with 5 batches + create_test_arrow_file(&file_path, 5).unwrap(); + + let reader = ReverseReader::try_new(&file_path).unwrap(); + assert_eq!(reader.count(), 5); + } } From 51eabca18bce9dfb7e8b4e14b43d7ae6b57caf22 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 17 Mar 2025 00:07:32 +0530 Subject: [PATCH 31/31] refactor: declutter by separating out into #1239 --- src/parseable/streams.rs | 60 ++++++++++------------------------------ 1 file changed, 14 insertions(+), 46 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index a3029cef6..f1d4fd6ca 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -32,7 +32,6 @@ use arrow_schema::{Field, Fields, Schema}; use chrono::{NaiveDateTime, Timelike, Utc}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; -use once_cell::sync::Lazy; use parquet::{ arrow::ArrowWriter, basic::Encoding, @@ -41,7 +40,6 @@ use parquet::{ schema::types::ColumnPath, }; use rand::distributions::DistString; -use regex::Regex; use relative_path::RelativePathBuf; use tokio::task::JoinSet; use tracing::{error, info, trace, warn}; @@ -72,41 +70,6 @@ use super::{ // ~16K rows is default in-memory limit for each recordbatch const MAX_RECORD_BATCH_SIZE: usize = 16384; -/// Regex pattern for parsing arrow file names. -/// -/// # Format -/// The expected format is: `..data.arrows` -/// where: -/// - schema_key: A key that is associated with the timestamp at ingestion, hash of arrow schema and the key-value -/// pairs joined by '&' and '=' (e.g., "20200201T1830f8a5fc1edc567d56&key1=value1&key2=value2") -/// - front_part: Captured for parquet file naming, contains the timestamp associted with current/time-partition -/// as well as the custom partitioning key=value pairs (e.g., "date=2020-01-21.hour=10.minute=30.key1=value1.key2=value2.ee529ffc8e76") -/// -/// # Limitations -/// - Partition keys and values must only contain alphanumeric characters -/// - Special characters in partition values will cause the pattern to fail in capturing -/// -/// # Examples -/// Valid: "key1=value1,key2=value2" -/// Invalid: "key1=special!value,key2=special#value" -static ARROWS_NAME_STRUCTURE: Lazy = Lazy::new(|| { - Regex::new(r"^[a-zA-Z0-9&=]+\.(?P\S+)\.data\.arrows$").expect("Validated regex") -}); - -/// Returns the filename for parquet if provided arrows file path is valid as per our expectation -fn arrow_path_to_parquet(path: &Path, random_string: &str) -> Option { - let filename = path.file_name().unwrap().to_str().unwrap(); - let filename = ARROWS_NAME_STRUCTURE - .captures(filename) - .and_then(|c| c.get(1))? - .as_str(); - let filename_with_random_number = format!("{filename}.data.{random_string}.parquet"); - let mut parquet_path = path.to_owned(); - parquet_path.set_file_name(filename_with_random_number); - - Some(parquet_path) -} - #[derive(Debug, thiserror::Error)] #[error("Stream not found: {0}")] pub struct StreamNotFound(pub String); @@ -228,12 +191,7 @@ impl Stream { let paths = dir .flatten() .map(|file| file.path()) - .filter(|path| { - let Some(file_name) = path.file_name().and_then(|f| f.to_str()) else { - return false; - }; - ARROWS_NAME_STRUCTURE.is_match(file_name) - }) + .filter(|file| file.extension().is_some_and(|ext| ext.eq("arrows"))) .sorted_by_key(|f| f.metadata().unwrap().modified().unwrap()) .collect(); @@ -276,13 +234,12 @@ impl Stream { &arrow_file_path, self.stream_name ); remove_file(&arrow_file_path).unwrap(); - } else if let Some(key) = arrow_path_to_parquet(&arrow_file_path, &random_string) { + } else { + let key = Self::arrow_path_to_parquet(&arrow_file_path, &random_string); grouped_arrow_file .entry(key) .or_default() .push(arrow_file_path); - } else { - warn!("Unexpected arrows file: {}", arrow_file_path.display()); } } grouped_arrow_file @@ -341,6 +298,17 @@ impl Stream { } } + fn arrow_path_to_parquet(path: &Path, random_string: &str) -> PathBuf { + let filename = path.file_stem().unwrap().to_str().unwrap(); + let (_, filename) = filename.split_once('.').unwrap(); + assert!(filename.contains('.'), "contains the delim `.`"); + let filename_with_random_number = format!("{filename}.{random_string}.arrows"); + let mut parquet_path = path.to_owned(); + parquet_path.set_file_name(filename_with_random_number); + parquet_path.set_extension("parquet"); + parquet_path + } + /// Converts arrow files in staging into parquet files, does so only for past minutes when run with `!shutdown_signal` pub fn prepare_parquet(&self, shutdown_signal: bool) -> Result<(), StagingError> { info!(