Skip to content

Commit e456d67

Browse files
author
Devdutt Shenoi
committed
Merge remote-tracking branch 'origin/main' into filewriter
2 parents d0159f1 + d147f48 commit e456d67

File tree

8 files changed

+334
-206
lines changed

8 files changed

+334
-206
lines changed

Cargo.lock

Lines changed: 116 additions & 101 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,16 @@ build = "build.rs"
99

1010
[dependencies]
1111
# Arrow and DataFusion ecosystem
12-
arrow-array = { version = "53.0.0" }
13-
arrow-flight = { version = "53.0.0", features = ["tls"] }
14-
arrow-ipc = { version = "53.0.0", features = ["zstd"] }
15-
arrow-json = "53.0.0"
16-
arrow-schema = { version = "53.0.0", features = ["serde"] }
17-
arrow-select = "53.0.0"
18-
datafusion = "44.0.0"
12+
arrow = "54.0.0"
13+
arrow-array = "54.0.0"
14+
arrow-flight = { version = "54.0.0", features = ["tls"] }
15+
arrow-ipc = { version = "54.0.0", features = ["zstd"] }
16+
arrow-json = "54.0.0"
17+
arrow-schema = { version = "54.0.0", features = ["serde"] }
18+
arrow-select = "54.0.0"
19+
datafusion = "45.0.0"
1920
object_store = { version = "0.11.2", features = ["cloud", "aws", "azure"] }
20-
parquet = "53.0.0"
21+
parquet = "54.0.0"
2122

2223
# Web server and HTTP-related
2324
actix-cors = "0.7.0"
@@ -134,7 +135,7 @@ anyhow = "1.0"
134135

135136
[dev-dependencies]
136137
rstest = "0.23.0"
137-
arrow = "53.0.0"
138+
arrow = "54.0.0"
138139
temp-dir = "0.1.14"
139140

140141
[package.metadata.parseable_ui]

src/parseable/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ mod streams;
6767
/// File extension for arrow files in staging
6868
const ARROW_FILE_EXTENSION: &str = "arrows";
6969

70+
/// File extension for incomplete arrow files
71+
const PART_FILE_EXTENSION: &str = "part";
72+
7073
/// Name of a Stream
7174
/// NOTE: this used to be a struct, flattened out for simplicity
7275
pub type LogStream = String;

src/parseable/staging/reader.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,14 @@ mod tests {
166166
};
167167
use arrow_ipc::{reader::FileReader, writer::FileWriter};
168168
use arrow_schema::{DataType, Field, Schema};
169+
use chrono::Utc;
169170
use temp_dir::TempDir;
170171

171-
use crate::parseable::staging::reader::MergedRecordReader;
172+
use crate::{
173+
parseable::staging::{reader::MergedRecordReader, writer::DiskWriter},
174+
utils::time::TimeRange,
175+
OBJECT_STORE_DATA_GRANULARITY,
176+
};
172177

173178
fn rb(rows: usize) -> RecordBatch {
174179
let array1: Arc<dyn Array> = Arc::new(Int64Array::from_iter(0..rows as i64));
@@ -293,21 +298,21 @@ mod tests {
293298
schema: &Arc<Schema>,
294299
batches: &[RecordBatch],
295300
) -> io::Result<()> {
296-
let file = File::create(path)?;
297-
let mut writer = FileWriter::try_new(file, schema).expect("Failed to create StreamWriter");
301+
let range = TimeRange::granularity_range(Utc::now(), OBJECT_STORE_DATA_GRANULARITY);
302+
let mut writer =
303+
DiskWriter::try_new(path, schema, range).expect("Failed to create StreamWriter");
298304

299305
for batch in batches {
300306
writer.write(batch).expect("Failed to write batch");
301307
}
302308

303-
writer.finish().expect("Failed to finalize writer");
304309
Ok(())
305310
}
306311

307312
#[test]
308313
fn test_merged_reverse_record_reader() -> io::Result<()> {
309314
let dir = TempDir::new().unwrap();
310-
let file_path = dir.path().join("test.arrow");
315+
let file_path = dir.path().join("test.data.arrows");
311316

312317
// Create a schema
313318
let schema = Arc::new(Schema::new(vec![
@@ -369,7 +374,7 @@ mod tests {
369374
#[test]
370375
fn test_get_reverse_reader_single_message() -> io::Result<()> {
371376
let dir = TempDir::new().unwrap();
372-
let file_path = dir.path().join("test_single.arrow");
377+
let file_path = dir.path().join("test_single.data.arrows");
373378

374379
// Create a schema
375380
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));

src/parseable/staging/writer.rs

Lines changed: 50 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,62 +21,85 @@ use std::{
2121
collections::{HashMap, HashSet},
2222
fs::{File, OpenOptions},
2323
io::BufWriter,
24+
path::PathBuf,
2425
sync::Arc,
2526
};
2627

2728
use arrow_array::RecordBatch;
2829
use arrow_ipc::writer::FileWriter;
2930
use arrow_schema::Schema;
3031
use arrow_select::concat::concat_batches;
32+
use chrono::Utc;
3133
use itertools::Itertools;
32-
use tracing::trace;
34+
use tracing::{error, warn};
3335

34-
use crate::parseable::ARROW_FILE_EXTENSION;
35-
use crate::utils::arrow::adapt_batch;
36+
use crate::{
37+
parseable::{ARROW_FILE_EXTENSION, PART_FILE_EXTENSION},
38+
utils::{arrow::adapt_batch, time::TimeRange},
39+
};
3640

3741
use super::StagingError;
3842

43+
#[derive(Default)]
44+
pub struct Writer {
45+
pub mem: MemWriter<16384>,
46+
pub disk: HashMap<String, DiskWriter>,
47+
}
48+
3949
/// Context regarding `.arrows` file being persisted onto disk
4050
pub struct DiskWriter {
4151
inner: FileWriter<BufWriter<File>>,
42-
/// Used to ensure un"finish"ed arrow files are renamed on "finish"
43-
path_prefix: String,
52+
path: PathBuf,
53+
range: TimeRange,
4454
}
4555

4656
impl DiskWriter {
47-
pub fn new(path_prefix: String, schema: &Schema) -> Result<Self, StagingError> {
48-
// Live writes happen into partfile
49-
let partfile_path = format!("{path_prefix}.part.{ARROW_FILE_EXTENSION}");
57+
/// Try to create a file to stream arrows into
58+
pub fn try_new(
59+
path: impl Into<PathBuf>,
60+
schema: &Schema,
61+
range: TimeRange,
62+
) -> Result<Self, StagingError> {
63+
let mut path = path.into();
64+
path.set_extension(PART_FILE_EXTENSION);
5065
let file = OpenOptions::new()
66+
.write(true)
67+
.truncate(true)
5168
.create(true)
52-
.append(true)
53-
.open(partfile_path)?;
69+
.open(&path)?;
70+
let inner = FileWriter::try_new_buffered(file, schema)?;
5471

55-
Ok(Self {
56-
inner: FileWriter::try_new_buffered(file, schema)?,
57-
path_prefix,
58-
})
72+
Ok(Self { inner, path, range })
5973
}
6074

61-
/// Appends records into a `.part.arrows` file
62-
pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> {
63-
self.inner.write(rb)?;
75+
pub fn is_current(&self) -> bool {
76+
self.range.contains(Utc::now())
77+
}
6478

65-
Ok(())
79+
/// Write a single recordbatch into file
80+
pub fn write(&mut self, rb: &RecordBatch) -> Result<(), StagingError> {
81+
self.inner.write(rb).map_err(StagingError::Arrow)
6682
}
83+
}
6784

68-
/// Ensures `.arrows`` file in staging directory is "finish"ed and renames it from "part".
69-
pub fn finish(&mut self) -> Result<(), StagingError> {
70-
self.inner.finish()?;
85+
impl Drop for DiskWriter {
86+
/// Write the continuation bytes and mark the file as done, rename to `.data.arrows`
87+
fn drop(&mut self) {
88+
if let Err(err) = self.inner.finish() {
89+
error!("Couldn't finish arrow file {:?}, error = {err}", self.path);
90+
return;
91+
}
7192

72-
let partfile_path = format!("{}.part.{ARROW_FILE_EXTENSION}", self.path_prefix);
73-
let arrows_path = format!("{}.data.{ARROW_FILE_EXTENSION}", self.path_prefix);
93+
let mut arrow_path = self.path.to_owned();
94+
arrow_path.set_extension(ARROW_FILE_EXTENSION);
7495

75-
// Rename from part file to finished arrows file
76-
std::fs::rename(partfile_path, &arrows_path)?;
77-
trace!("Finished arrows file: {arrows_path}");
96+
if arrow_path.exists() {
97+
warn!("File {arrow_path:?} exists and will be overwritten");
98+
}
7899

79-
Ok(())
100+
if let Err(err) = std::fs::rename(&self.path, &arrow_path) {
101+
error!("Couldn't rename file {:?}, error = {err}", self.path);
102+
}
80103
}
81104
}
82105

@@ -171,9 +194,3 @@ impl<const N: usize> MutableBuffer<N> {
171194
}
172195
}
173196
}
174-
175-
#[derive(Default)]
176-
pub struct Writer<const N: usize> {
177-
pub mem: MemWriter<N>,
178-
pub disk: HashMap<String, DiskWriter>,
179-
}

0 commit comments

Comments
 (0)