Skip to content

Commit 0c97d74

Browse files
trueleonitisht
andauthored
Fix staging arrow extension (#392)
Co-authored-by: Nitish Tiwari <[email protected]>
1 parent bee5528 commit 0c97d74

File tree

1 file changed

+9
-5
lines changed

1 file changed

+9
-5
lines changed

server/src/storage/staging.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use parquet::{
3838
};
3939

4040
use crate::{
41+
event::DEFAULT_TIMESTAMP_KEY,
4142
metrics,
4243
option::CONFIG,
4344
storage::OBJECT_STORE_DATA_GRANULARITY,
@@ -47,6 +48,9 @@ use crate::{
4748
},
4849
};
4950

51+
const ARROW_FILE_EXTENSION: &str = "data.arrows";
52+
const PARQUET_FILE_EXTENSION: &str = "data.parquet";
53+
5054
// in mem global that hold all the in mem buffer that are ready to convert
5155
pub static MEMORY_READ_BUFFERS: Lazy<RwLock<HashMap<String, Vec<ReadBuf>>>> =
5256
Lazy::new(RwLock::default);
@@ -93,7 +97,7 @@ impl StorageDir {
9397
format!(
9498
"{}.{}",
9599
stream_hash,
96-
Self::file_time_suffix(time, "data.arrows")
100+
Self::file_time_suffix(time, ARROW_FILE_EXTENSION)
97101
)
98102
}
99103

@@ -140,8 +144,8 @@ impl StorageDir {
140144
&self,
141145
exclude: NaiveDateTime,
142146
) -> HashMap<PathBuf, Vec<PathBuf>> {
143-
let hot_filename = StorageDir::file_time_suffix(exclude, "data.arrow");
144-
// hashmap <time, vec[paths]> but exclude where hotfilename matches
147+
let hot_filename = StorageDir::file_time_suffix(exclude, ARROW_FILE_EXTENSION);
148+
// hashmap <time, vec[paths]> but exclude where hot filename matches
145149
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
146150
let mut arrow_files = self.arrow_files();
147151
arrow_files.retain(|path| {
@@ -185,7 +189,7 @@ impl StorageDir {
185189

186190
pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf {
187191
let data_path = CONFIG.parseable.local_stream_data_path(stream_name);
188-
let dir = StorageDir::file_time_suffix(time, "data.parquet");
192+
let dir = StorageDir::file_time_suffix(time, PARQUET_FILE_EXTENSION);
189193

190194
data_path.join(dir)
191195
}
@@ -278,7 +282,7 @@ fn parquet_writer_props() -> WriterPropertiesBuilder {
278282
.set_max_row_group_size(CONFIG.parseable.row_group_size)
279283
.set_compression(CONFIG.parseable.parquet_compression.into())
280284
.set_column_encoding(
281-
ColumnPath::new(vec!["p_timestamp".to_string()]),
285+
ColumnPath::new(vec![DEFAULT_TIMESTAMP_KEY.to_string()]),
282286
Encoding::DELTA_BINARY_PACKED,
283287
)
284288
}

0 commit comments

Comments
 (0)