Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ option(KAMINPAR_ENABLE_THP "Use transparent huge pages for large memory allocati

option(KAMINPAR_BUILD_WITH_DEBUG_SYMBOLS "Always build with debug symbols, even in Release mode." ON)
option(KAMINPAR_BUILD_WITH_ASAN "Enable address sanitizer." OFF)
option(KAMINPAR_BUILD_WITH_TSAN "Enable thread sanitizer." OFF)
option(KAMINPAR_BUILD_WITH_UBSAN "Enable undefined behaviour sanitizer." OFF)
option(KAMINPAR_BUILD_WITH_PG "Build with the -pg option for profiling." OFF)

Expand Down Expand Up @@ -183,6 +184,11 @@ if(KAMINPAR_BUILD_WITH_ASAN)
add_link_options(-fsanitize=address)
endif()

if (KAMINPAR_BUILD_WITH_TSAN)
add_compile_options(-fsanitize=thread)
add_link_options(-fsanitize=thread)
endif()

if(KAMINPAR_BUILD_WITH_UBSAN)
add_compile_options(-fsanitize=undefined)
add_link_options(-fsanitize=undefined)
Expand Down
69 changes: 41 additions & 28 deletions kaminpar-common/datastructures/multi_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,26 @@
******************************************************************************/
#pragma once

#include <atomic>
#include <cstdint>
#include <random>
#include <vector>

#include <tbb/enumerable_thread_specific.h>

#include "kaminpar-common/datastructures/dynamic_binary_heap.h"
#include "kaminpar-common/logger.h"

namespace kaminpar {

template <typename ID, typename Key, template <typename> typename Comparator> class MultiQueue {
constexpr static Key kEmptyKey = Comparator<Key>::kMaxValue;

constexpr static std::uint8_t kLocked = 1u;
constexpr static std::uint8_t kUnlocked = 0u;

constexpr static std::size_t kNumPQsPerThread = 2;
constexpr static int kNumPopAttempts = 32;

SET_DEBUG(false);

struct Token {
Token(const int seed, const std::size_t num_pqs) : dist(0, num_pqs - 1) {
rng.seed(seed);
Expand Down Expand Up @@ -90,12 +93,13 @@ template <typename ID, typename Key, template <typename> typename Comparator> cl
for (int attempt = 0; attempt < kNumPopAttempts; ++attempt) {
const auto [first, second] = token.pick_two_random_pqs();

if (_pqs[first].empty() && _pqs[second].empty()) {
const bool first_empty = is_empty(first);
if (first_empty && is_empty(second)) {
continue;
}

const std::size_t pq =
(_pqs[first].empty() || _cmp(_top_keys[first], _top_keys[second])) ? second : first;
(first_empty || _cmp(top_key(first), top_key(second))) ? second : first;

if (!try_lock(pq)) {
continue;
Expand All @@ -115,9 +119,11 @@ template <typename ID, typename Key, template <typename> typename Comparator> cl
std::size_t best_pq = std::numeric_limits<std::size_t>::max();

for (std::size_t pq = 0; pq < _pqs.size(); ++pq) {
if (!_pqs[pq].empty() && _cmp(_top_keys[pq], best_key)) {
best_key = _top_keys[pq];
best_pq = pq;
if (!is_empty(pq)) {
if (const Key top = top_key(pq); _cmp(top, best_key)) {
best_key = top;
best_pq = pq;
}
}
}

Expand All @@ -128,6 +134,7 @@ template <typename ID, typename Key, template <typename> typename Comparator> cl
if (!try_lock(best_pq)) {
continue;
}

if (_pqs[best_pq].empty()) {
unlock(best_pq);
continue;
Expand All @@ -145,16 +152,11 @@ template <typename ID, typename Key, template <typename> typename Comparator> cl
pq = token.pick_random_pq();
} while (!try_lock(pq));

DBG << "Locked " << pq << ", state: " << _pq_locks;

return {pq, &_pqs[pq]};
}

void unlock(const Handle handle) {
KASSERT(!!handle);

DBG << "Unlock " << handle.index() << ", state: " << _pq_locks;

unlock(handle.index());
}

Expand All @@ -174,7 +176,7 @@ template <typename ID, typename Key, template <typename> typename Comparator> cl
_pq_locks.resize(num_pqs);

_top_keys.clear();
_top_keys.assign(num_pqs, Comparator<Key>::kMaxValue);
_top_keys.assign(num_pqs, AlignedKey{.value = kEmptyKey});

_token_ets.clear();
}
Expand All @@ -193,43 +195,54 @@ template <typename ID, typename Key, template <typename> typename Comparator> cl
}

private:
[[nodiscard]] bool is_locked(const std::size_t pq) const {
[[nodiscard]] bool is_locked(const std::size_t pq) {
KASSERT(pq < _pq_locks.size());
return std::atomic_ref<std::uint8_t>(_pq_locks[pq]).load(std::memory_order_relaxed) !=
kUnlocked;
}

[[nodiscard]] Key top_key(const std::size_t pq) {
KASSERT(pq < _pq_locks.size());
return std::atomic_ref<Key>(_top_keys[pq].value).load(std::memory_order_relaxed);
}

return __atomic_load_n(&_pq_locks[pq], __ATOMIC_RELAXED);
[[nodiscard]] bool is_empty(const std::size_t pq) {
return top_key(pq) == kEmptyKey;
}

bool try_lock(const std::size_t pq) {
KASSERT(pq < _pq_locks.size());

std::uint8_t zero = 0u;
std::uint8_t zero = kUnlocked;
return !is_locked(pq) &&
__atomic_compare_exchange_n(
&_pq_locks[pq], &zero, 1u, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED
&_pq_locks[pq], &zero, kLocked, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED
);
}

void unlock(const std::size_t pq) {
KASSERT(pq < _pq_locks.size());
KASSERT(is_locked(pq), "PQ " << pq << " expected to be locked, but is unlocked");

update_top_key(pq);
__atomic_store_n(&_pq_locks[pq], 0u, __ATOMIC_RELAXED);
}

void update_top_key(const std::size_t pq) {
KASSERT(is_locked(pq), "PQ " << pq << " expected to be locked, but is unlocked");

if (!_pqs[pq].empty()) {
_top_keys[pq] = _pqs[pq].peek_key();
std::atomic_ref<Key> top_key_ref(_top_keys[pq].value);
if (_pqs[pq].empty()) {
top_key_ref.store(kEmptyKey, std::memory_order_relaxed);
} else {
top_key_ref.store(_pqs[pq].peek_key(), std::memory_order_relaxed);
}

std::atomic_ref<std::uint8_t>(_pq_locks[pq]).store(kUnlocked, std::memory_order_relaxed);
}

int _seed;

struct alignas(std::atomic_ref<Key>::required_alignment) AlignedKey {
Key value;
};

std::vector<std::uint8_t> _pq_locks;
std::vector<PQ> _pqs;
std::vector<Key> _top_keys;
std::vector<AlignedKey> _top_keys;

tbb::enumerable_thread_specific<Token> _token_ets{[&] {
return Token(_seed, _pqs.size());
Expand Down
Loading