Skip to content

Commit 7fb5840

Browse files
committed
vendor: Update vendored sources to duckdb/duckdb@5141aa4
Combine small row groups in Parquet writer (duckdb/duckdb#17036)
1 parent deca202 commit 7fb5840

File tree

5 files changed

+50
-6
lines changed

5 files changed

+50
-6
lines changed

src/duckdb/extension/parquet/include/writer/templated_column_writer.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ class StandardColumnWriter : public PrimitiveColumnWriter {
224224

225225
auto &state = state_p.Cast<StandardColumnWriterState<SRC, TGT, OP>>();
226226
if (state.dictionary.GetSize() == 0 || state.dictionary.IsFull()) {
227+
state.dictionary.Reset();
227228
if (writer.GetParquetVersion() == ParquetVersion::V1) {
228229
// Can't do the cool stuff for V1
229230
state.encoding = duckdb_parquet::Encoding::PLAIN;

src/duckdb/extension/parquet/parquet_extension.cpp

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -781,6 +781,9 @@ struct ParquetWriteBindData : public TableFunctionData {
781781

782782
struct ParquetWriteGlobalState : public GlobalFunctionData {
783783
unique_ptr<ParquetWriter> writer;
784+
785+
mutex lock;
786+
unique_ptr<ColumnDataCollection> combine_buffer;
784787
};
785788

786789
struct ParquetWriteLocalState : public LocalFunctionData {
@@ -986,16 +989,45 @@ void ParquetWriteSink(ExecutionContext &context, FunctionData &bind_data_p, Glob
986989
}
987990
}
988991

989-
void ParquetWriteCombine(ExecutionContext &context, FunctionData &bind_data, GlobalFunctionData &gstate,
992+
void ParquetWriteCombine(ExecutionContext &context, FunctionData &bind_data_p, GlobalFunctionData &gstate,
990993
LocalFunctionData &lstate) {
994+
auto &bind_data = bind_data_p.Cast<ParquetWriteBindData>();
991995
auto &global_state = gstate.Cast<ParquetWriteGlobalState>();
992996
auto &local_state = lstate.Cast<ParquetWriteLocalState>();
993-
// flush any data left in the local state to the file
994-
global_state.writer->Flush(local_state.buffer);
997+
998+
if (local_state.buffer.Count() >= bind_data.row_group_size / 2 ||
999+
local_state.buffer.SizeInBytes() >= bind_data.row_group_size_bytes / 2) {
1000+
// local state buffer is more than half of the row_group_size(_bytes), just flush it
1001+
global_state.writer->Flush(local_state.buffer);
1002+
return;
1003+
}
1004+
1005+
unique_lock<mutex> guard(global_state.lock);
1006+
if (global_state.combine_buffer) {
1007+
// There is still some data, combine it
1008+
global_state.combine_buffer->Combine(local_state.buffer);
1009+
if (global_state.combine_buffer->Count() >= bind_data.row_group_size / 2 ||
1010+
global_state.combine_buffer->SizeInBytes() >= bind_data.row_group_size_bytes / 2) {
1011+
// After combining, the combine buffer is more than half of the row_group_size(_bytes), so we flush
1012+
auto owned_combine_buffer = std::move(global_state.combine_buffer);
1013+
guard.unlock();
1014+
// Lock free, of course
1015+
global_state.writer->Flush(*owned_combine_buffer);
1016+
}
1017+
return;
1018+
}
1019+
1020+
global_state.combine_buffer = make_uniq<ColumnDataCollection>(context.client, local_state.buffer.Types());
1021+
global_state.combine_buffer->Combine(local_state.buffer);
9951022
}
9961023

9971024
void ParquetWriteFinalize(ClientContext &context, FunctionData &bind_data, GlobalFunctionData &gstate) {
9981025
auto &global_state = gstate.Cast<ParquetWriteGlobalState>();
1026+
// flush the combine buffer (if it's there)
1027+
if (global_state.combine_buffer) {
1028+
global_state.writer->Flush(*global_state.combine_buffer);
1029+
}
1030+
9991031
// finalize: write any additional metadata to the file here
10001032
global_state.writer->Finalize();
10011033
}

src/duckdb/extension/parquet/writer/primitive_column_writer.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,12 @@ void PrimitiveColumnWriter::WriteDictionary(PrimitiveColumnWriterState &state, u
391391
write_info.compressed_buf);
392392
hdr.compressed_page_size = UnsafeNumericCast<int32_t>(write_info.compressed_size);
393393

394+
if (write_info.compressed_buf) {
395+
// if the data has been compressed, we no longer need the uncompressed data
396+
D_ASSERT(write_info.compressed_buf.get() == write_info.compressed_data);
397+
write_info.temp_writer.reset();
398+
}
399+
394400
// insert the dictionary page as the first page to write for this column
395401
state.write_info.insert(state.write_info.begin(), std::move(write_info));
396402
}

src/duckdb/src/function/table/version/pragma_version.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#ifndef DUCKDB_PATCH_VERSION
2-
#define DUCKDB_PATCH_VERSION "0-dev2266"
2+
#define DUCKDB_PATCH_VERSION "0-dev2271"
33
#endif
44
#ifndef DUCKDB_MINOR_VERSION
55
#define DUCKDB_MINOR_VERSION 3
@@ -8,10 +8,10 @@
88
#define DUCKDB_MAJOR_VERSION 1
99
#endif
1010
#ifndef DUCKDB_VERSION
11-
#define DUCKDB_VERSION "v1.3.0-dev2266"
11+
#define DUCKDB_VERSION "v1.3.0-dev2271"
1212
#endif
1313
#ifndef DUCKDB_SOURCE_ID
14-
#define DUCKDB_SOURCE_ID "6c176c0a3b"
14+
#define DUCKDB_SOURCE_ID "5141aa4560"
1515
#endif
1616
#include "duckdb/function/table/system_functions.hpp"
1717
#include "duckdb/main/database.hpp"

src/duckdb/src/include/duckdb/common/primitive_dictionary.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ class PrimitiveDictionary {
120120
return result;
121121
}
122122

123+
void Reset() {
124+
allocated_dictionary.Reset();
125+
allocated_target.Reset();
126+
}
127+
123128
private:
124129
//! Look up a value in the dictionary using linear probing
125130
primitive_dictionary_entry_t &Lookup(const SRC &value) const {

0 commit comments

Comments
 (0)