Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Index structure for free buffers of a slab #688

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/BufferMgr/Buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
namespace Buffer_Namespace {

Buffer::Buffer(BufferMgr* bm,
BufferList::iterator seg_it,
SegmentList::iterator seg_it,
const int device_id,
const size_t page_size,
const size_t num_bytes)
Expand Down
4 changes: 2 additions & 2 deletions omniscidb/DataMgr/BufferMgr/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Buffer : public AbstractBuffer {
*/

Buffer(BufferMgr* bm,
BufferList::iterator seg_it,
SegmentList::iterator seg_it,
const int device_id,
const size_t page_size = 512,
const size_t num_bytes = 0);
Expand Down Expand Up @@ -178,7 +178,7 @@ class Buffer : public AbstractBuffer {
const int src_device_id = -1) = 0;

BufferMgr* bm_;
BufferList::iterator seg_it_;
SegmentList::iterator seg_it_;
size_t page_size_; /// the size of each page in the buffer
size_t num_pages_;
int epoch_; /// indicates when the buffer was last flushed
Expand Down
227 changes: 149 additions & 78 deletions omniscidb/DataMgr/BufferMgr/BufferMgr.cpp

Large diffs are not rendered by default.

76 changes: 61 additions & 15 deletions omniscidb/DataMgr/BufferMgr/BufferMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,52 @@ class TooBigForSlab : public OutOfMemory {
using namespace Data_Namespace;

namespace Buffer_Namespace {
struct Slab {
private:
static bool iterator_page_count_cmp_l(const SegmentList::iterator& lhs,
const size_t num_pages) {
return lhs->num_pages < num_pages;
}
struct SegmentListIterCmp {
bool operator()(const SegmentList::iterator& lhs,
const SegmentList::iterator& rhs) const {
if (lhs->num_pages < rhs->num_pages) {
return true;
}
if (lhs->num_pages > rhs->num_pages) {
return false;
}
return (lhs->start_page < rhs->start_page);
}
};

std::set<SegmentList::iterator, SegmentListIterCmp> free_segment_index_;
SegmentList segments_;

public:
Slab(const size_t page_count) : segments_(SegmentList{BufferSeg(0, page_count)}) {
free_segment_index_.insert(segments_.begin());
}

SegmentList::iterator getFreeSegment(const size_t num_requested_pages);

SegmentList::iterator insert(SegmentList::iterator& slab_buffer_pos,
BufferSeg& data_seg);

SegmentList::iterator remove(SegmentList::iterator slab_buffer_pos);

void index_insert(SegmentList::iterator& to_insert);
void index_remove(SegmentList::iterator& at);
void verify_index();

SegmentList::iterator begin() { return segments_.begin(); }
SegmentList::iterator end() { return segments_.end(); }

const std::set<SegmentList::iterator, SegmentListIterCmp>& getFreeSegments() const {
return free_segment_index_;
};
const SegmentList& getListToIndex() const { return segments_; };
};

struct MemoryData {
size_t slabNum;
Expand Down Expand Up @@ -132,7 +178,7 @@ class BufferMgr : public AbstractBufferMgr { // implements
void clearSlabs();
std::string printMap();
void printSegs();
std::string printSeg(BufferList::iterator& seg_it);
std::string printSeg(SegmentList::iterator& seg_it);
std::string keyToString(const ChunkKey& key);
size_t getInUseSize() override;
size_t getMaxSize() override;
Expand All @@ -141,7 +187,6 @@ class BufferMgr : public AbstractBufferMgr { // implements
size_t getMaxSlabSize();
size_t getPageSize();
bool isAllocationCapped() override;
const std::vector<BufferList>& getSlabSegments();

/// Creates a chunk with the specified key and page size.
AbstractBuffer* createBuffer(const ChunkKey& key,
Expand Down Expand Up @@ -193,8 +238,8 @@ class BufferMgr : public AbstractBufferMgr { // implements
size_t size();
size_t getNumChunks() override;

BufferList::iterator reserveBuffer(BufferList::iterator& seg_it,
const size_t num_bytes);
SegmentList::iterator reserveBuffer(SegmentList::iterator& seg_it,
const size_t num_bytes);
void getChunkMetadataVecForKeyPrefix(ChunkMetadataVector& chunk_metadata_vec,
const ChunkKey& key_prefix) override;

Expand All @@ -210,18 +255,18 @@ class BufferMgr : public AbstractBufferMgr { // implements
const size_t page_size_;
std::vector<int8_t*> slabs_; /// vector of beginning memory addresses for each
/// allocation of the buffer pool
std::vector<BufferList> slab_segments_;
std::vector<Slab> slab_segments_;

private:
BufferMgr(const BufferMgr&); // private copy constructor
BufferMgr& operator=(const BufferMgr&); // private assignment
void removeSegment(BufferList::iterator& seg_it);
BufferList::iterator findFreeBufferInSlab(const size_t slab_num,
const size_t num_pages_requested);
void removeSegment(SegmentList::iterator& seg_it);
SegmentList::iterator findFreeSegmentInSlab(const size_t slab_num,
const size_t num_pages_requested);
int getBufferId();
virtual void addSlab(const size_t slab_size) = 0;
virtual void freeAllMem() = 0;
virtual void allocateBuffer(BufferList::iterator seg_it,
virtual void allocateBuffer(SegmentList::iterator seg_it,
const size_t page_size,
const size_t num_bytes) = 0;
virtual AbstractBuffer* allocateZeroCopyBuffer(
Expand All @@ -241,7 +286,7 @@ class BufferMgr : public AbstractBufferMgr { // implements
// to this map should be synced throug chunk_index_mutex_.
std::map<ChunkKey, std::shared_ptr<std::condition_variable>> in_progress_buffer_cvs_;

std::map<ChunkKey, BufferList::iterator> chunk_index_;
std::map<ChunkKey, SegmentList::iterator> chunk_index_;
size_t max_buffer_pool_num_pages_; // max number of pages for buffer pool
size_t num_pages_allocated_;
size_t min_num_pages_per_slab_;
Expand All @@ -252,11 +297,12 @@ class BufferMgr : public AbstractBufferMgr { // implements
int max_buffer_id_;
unsigned int buffer_epoch_;

BufferList unsized_segs_;
SegmentList unsized_segs_;

SegmentList::iterator evict(SegmentList::iterator& evict_start,
const size_t num_pages_requested,
const int slab_num);

BufferList::iterator evict(BufferList::iterator& evict_start,
const size_t num_pages_requested,
const int slab_num);
/**
* @brief Gets a buffer of required size and returns an iterator to it
*
Expand All @@ -270,7 +316,7 @@ class BufferMgr : public AbstractBufferMgr { // implements
* USED if applicable
*
*/
BufferList::iterator findFreeBuffer(size_t num_bytes);
SegmentList::iterator findFreeSegment(size_t num_bytes);
};

} // namespace Buffer_Namespace
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/BufferMgr/BufferSeg.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,5 @@ struct BufferSeg {
, last_touched(last_touched) {}
};

using BufferList = std::list<BufferSeg>;
using SegmentList = std::list<BufferSeg>;
} // namespace Buffer_Namespace
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
namespace Buffer_Namespace {

CpuBuffer::CpuBuffer(BufferMgr* bm,
BufferList::iterator segment_iter,
SegmentList::iterator segment_iter,
const int device_id,
GpuMgr* gpu_mgr,
const size_t page_size,
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace Buffer_Namespace {
class CpuBuffer : public Buffer {
public:
CpuBuffer(BufferMgr* bm,
BufferList::iterator segment_iter,
SegmentList::iterator segment_iter,
const int device_id,
GpuMgr* gpu_mgr,
const size_t page_size = 512,
Expand Down
6 changes: 2 additions & 4 deletions omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBufferMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,15 @@ void CpuBufferMgr::addSlab(const size_t slab_size) {
slabs_.resize(slabs_.size() - 1);
throw FailedToCreateSlab(slab_size);
}
slab_segments_.resize(slab_segments_.size() + 1);
slab_segments_[slab_segments_.size() - 1].push_back(
BufferSeg(0, slab_size / page_size_));
slab_segments_.emplace_back(slab_size / page_size_);
}

void CpuBufferMgr::freeAllMem() {
CHECK(allocator_);
initializeMem();
}

void CpuBufferMgr::allocateBuffer(BufferList::iterator seg_it,
void CpuBufferMgr::allocateBuffer(SegmentList::iterator seg_it,
const size_t page_size,
const size_t initial_size) {
new CpuBuffer(this,
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/BufferMgr/CpuBufferMgr/CpuBufferMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class CpuBufferMgr : public BufferMgr {
protected:
void addSlab(const size_t slab_size) override;
void freeAllMem() override;
void allocateBuffer(BufferList::iterator segment_iter,
void allocateBuffer(SegmentList::iterator segment_iter,
const size_t page_size,
const size_t initial_size) override;
virtual void initializeMem();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ void TieredCpuBufferMgr::addSlab(const size_t slab_size) {
}
if (allocated_slab) {
// We allocated a new slab, so add segments for it.
slab_segments_.resize(slab_segments_.size() + 1);
slab_segments_[slab_segments_.size() - 1].push_back(
BufferSeg(0, slab_size / page_size_));
slab_segments_.emplace_back(slab_size / page_size_);
LOG(INFO) << "Allocated slab using " << tier_to_string(last_tier) << ".";
} else {
// None of the allocators allocated a slab, so revert to original size and throw.
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
namespace Buffer_Namespace {

GpuBuffer::GpuBuffer(BufferMgr* bm,
BufferList::iterator seg_it,
SegmentList::iterator seg_it,
const int device_id,
GpuMgr* gpu_mgr,
const size_t page_size,
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace Buffer_Namespace {
class GpuBuffer : public Buffer {
public:
GpuBuffer(BufferMgr* bm,
BufferList::iterator seg_it,
SegmentList::iterator seg_it,
const int device_id,
GpuMgr* gpu_mgr,
const size_t page_size = 512,
Expand Down
6 changes: 2 additions & 4 deletions omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBufferMgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ void GpuBufferMgr::addSlab(const size_t slab_size) {
slabs_.resize(slabs_.size() - 1);
throw FailedToCreateSlab(slab_size);
}
slab_segments_.resize(slab_segments_.size() + 1);
slab_segments_[slab_segments_.size() - 1].push_back(
BufferSeg(0, slab_size / page_size_));
slab_segments_.emplace_back(slab_size / page_size_);
}

void GpuBufferMgr::freeAllMem() {
Expand All @@ -74,7 +72,7 @@ void GpuBufferMgr::freeAllMem() {
}
}

void GpuBufferMgr::allocateBuffer(BufferList::iterator seg_it,
void GpuBufferMgr::allocateBuffer(SegmentList::iterator seg_it,
const size_t page_size,
const size_t initial_size) {
new GpuBuffer(this,
Expand Down
2 changes: 1 addition & 1 deletion omniscidb/DataMgr/BufferMgr/GpuBufferMgr/GpuBufferMgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class GpuBufferMgr : public BufferMgr {
private:
void addSlab(const size_t slab_size) override;
void freeAllMem() override;
void allocateBuffer(BufferList::iterator seg_it,
void allocateBuffer(SegmentList::iterator seg_it,
const size_t page_size,
const size_t initial_size) override;
GpuMgr* gpu_mgr_;
Expand Down
4 changes: 3 additions & 1 deletion omniscidb/QueryEngine/RelAlgExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1448,11 +1448,12 @@ ExecutionResult RelAlgExecutor::executeWorkUnit(
}
const auto body = work_unit.body;
CHECK(body);
auto timer1 = DEBUG_TIMER("get_table_infos");
const auto table_infos = get_table_infos(work_unit.exe_unit, executor_);
auto timer2 = DEBUG_TIMER("ALL");

auto ra_exe_unit = decide_approx_count_distinct_implementation(
work_unit.exe_unit, table_infos, executor_, co.device_type, target_exprs_owned_);

auto max_groups_buffer_entry_guess = work_unit.max_groups_buffer_entry_guess;
if (is_window_execution_unit(ra_exe_unit)) {
CHECK_EQ(table_infos.size(), size_t(1));
Expand Down Expand Up @@ -1484,6 +1485,7 @@ ExecutionResult RelAlgExecutor::executeWorkUnit(
eo.output_columnar_hint = true;
}
}
timer2.stop();

ExecutionResult result;
auto execute_and_handle_errors = [&](const auto max_groups_buffer_entry_guess_in,
Expand Down