Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
branches: [ master ]

env:
# Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.)
BUILD_TYPE: Release
Expand Down Expand Up @@ -58,5 +59,5 @@ jobs:
working-directory: ${{github.workspace}}/build/test
# Execute tests defined by the CMake configuration.
# See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail
run: ctest
run: ctest --output-on-failure

87 changes: 57 additions & 30 deletions include/CXXGraph/Partitioning/CoordinatedPartitionState.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,51 @@ class CoordinatedPartitionState : public PartitionState<T> {
PartitionMap<T> partition_map;
Globals GLOBALS;
int MAX_LOAD;
std::shared_ptr<std::mutex> machines_load_edges_mutex = nullptr;
std::shared_ptr<std::mutex> machines_load_vertices_mutex = nullptr;
std::shared_ptr<std::mutex> machines_weight_edges_mutex = nullptr;
std::shared_ptr<std::mutex> record_map_mutex = nullptr;
mutable std::mutex machines_load_edges_mutex;
mutable std::mutex machines_load_vertices_mutex;
mutable std::mutex machines_weight_edges_mutex;
mutable std::mutex record_map_mutex;
// DatWriter out; //to print the final partition of each edge
public:
CoordinatedPartitionState(const Globals &G);
~CoordinatedPartitionState();

// Disable copy
CoordinatedPartitionState(const CoordinatedPartitionState<T>&) = delete;
CoordinatedPartitionState<T>& operator=(const CoordinatedPartitionState<T>&) = delete;

// Custom move constructor
CoordinatedPartitionState(CoordinatedPartitionState<T>&& other) noexcept
: record_map(std::move(other.record_map)),
machines_load_edges(std::move(other.machines_load_edges)),
machines_weight_edges(std::move(other.machines_weight_edges)),
machines_load_vertices(std::move(other.machines_load_vertices)),
partition_map(std::move(other.partition_map)),
GLOBALS(std::move(other.GLOBALS)),
MAX_LOAD(other.MAX_LOAD)
// mutexes are default-constructed
{}

// Custom move assignment
CoordinatedPartitionState<T>& operator=(CoordinatedPartitionState<T>&& other) noexcept {
if (this != &other) {
std::lock_guard<std::mutex> lock1(machines_load_edges_mutex);
std::lock_guard<std::mutex> lock2(machines_load_vertices_mutex);
std::lock_guard<std::mutex> lock3(machines_weight_edges_mutex);
std::lock_guard<std::mutex> lock4(record_map_mutex);

record_map = std::move(other.record_map);
machines_load_edges = std::move(other.machines_load_edges);
machines_weight_edges = std::move(other.machines_weight_edges);
machines_load_vertices = std::move(other.machines_load_vertices);
partition_map = std::move(other.partition_map);
GLOBALS = std::move(other.GLOBALS);
MAX_LOAD = other.MAX_LOAD;
// mutexes are default-constructed
}
return *this;
}

std::shared_ptr<Record<T>> getRecord(CXXGraph::id_t x) override;
int getMachineLoad(const int m) const override;
int getMachineWeight(const int m) const override;
Expand All @@ -86,11 +122,7 @@ class CoordinatedPartitionState : public PartitionState<T> {
template <typename T>
CoordinatedPartitionState<T>::CoordinatedPartitionState(const Globals &G)
: record_map(),
GLOBALS(G),
machines_load_edges_mutex(std::make_shared<std::mutex>()),
machines_load_vertices_mutex(std::make_shared<std::mutex>()),
machines_weight_edges_mutex(std::make_shared<std::mutex>()),
record_map_mutex(std::make_shared<std::mutex>()) {
GLOBALS(G) {
machines_load_edges.reserve(GLOBALS.numberOfPartition);
machines_load_vertices.reserve(GLOBALS.numberOfPartition);
machines_weight_edges.reserve(GLOBALS.numberOfPartition);
Expand All @@ -104,12 +136,12 @@ CoordinatedPartitionState<T>::CoordinatedPartitionState(const Globals &G)
}

template <typename T>
CoordinatedPartitionState<T>::~CoordinatedPartitionState() {}
CoordinatedPartitionState<T>::~CoordinatedPartitionState() = default;

template <typename T>
std::shared_ptr<Record<T>> CoordinatedPartitionState<T>::getRecord(
CXXGraph::id_t x) {
std::lock_guard<std::mutex> lock(*record_map_mutex);
std::lock_guard<std::mutex> lock(record_map_mutex);
if (record_map.find(x) == record_map.end()) {
record_map[x] = std::make_shared<CoordinatedRecord<T>>();
}
Expand All @@ -118,25 +150,25 @@ std::shared_ptr<Record<T>> CoordinatedPartitionState<T>::getRecord(

template <typename T>
int CoordinatedPartitionState<T>::getMachineLoad(const int m) const {
std::lock_guard<std::mutex> lock(*machines_load_edges_mutex);
std::lock_guard<std::mutex> lock(machines_load_edges_mutex);
return (int)machines_load_edges.at(m);
}

template <typename T>
int CoordinatedPartitionState<T>::getMachineWeight(const int m) const {
std::lock_guard<std::mutex> lock(*machines_weight_edges_mutex);
std::lock_guard<std::mutex> lock(machines_weight_edges_mutex);
return (int)machines_weight_edges.at(m);
}

template <typename T>
int CoordinatedPartitionState<T>::getMachineLoadVertices(const int m) const {
std::lock_guard<std::mutex> lock(*machines_load_vertices_mutex);
std::lock_guard<std::mutex> lock(machines_load_vertices_mutex);
return (int)machines_load_vertices.at(m);
}
template <typename T>
void CoordinatedPartitionState<T>::incrementMachineLoad(
const int m, shared<const Edge<T>> e) {
std::lock_guard<std::mutex> lock(*machines_load_edges_mutex);
std::lock_guard<std::mutex> lock(machines_load_edges_mutex);
machines_load_edges[m] = machines_load_edges[m] + 1;
int new_value = machines_load_edges.at(m);
if (new_value > MAX_LOAD) {
Expand All @@ -147,22 +179,17 @@ void CoordinatedPartitionState<T>::incrementMachineLoad(
template <typename T>
void CoordinatedPartitionState<T>::incrementMachineWeight(
const int m, shared<const Edge<T>> e) {
std::lock_guard<std::mutex> lock(*machines_weight_edges_mutex);
std::lock_guard<std::mutex> lock(machines_weight_edges_mutex);
double edge_weight = CXXGraph::NEGLIGIBLE_WEIGHT;
if (e->isWeighted().has_value() && e->isWeighted().value()) {
edge_weight = (std::dynamic_pointer_cast<const Weighted>(e))->getWeight();
}
machines_weight_edges[m] = machines_weight_edges[m] + edge_weight;
// double new_value = machines_weight_edges[m];
// if (new_value > MAX_LOAD)
//{
// MAX_LOAD = new_value;
//}
partition_map[m]->addEdge(e);
}
template <typename T>
int CoordinatedPartitionState<T>::getMinLoad() const {
std::lock_guard<std::mutex> lock(*machines_load_edges_mutex);
std::lock_guard<std::mutex> lock(machines_load_edges_mutex);
int MIN_LOAD = std::numeric_limits<int>::max();
for (const auto &machines_load_edges_it : machines_load_edges) {
int loadi = machines_load_edges_it;
Expand All @@ -178,7 +205,7 @@ int CoordinatedPartitionState<T>::getMaxLoad() const {
}
template <typename T>
int CoordinatedPartitionState<T>::getMachineWithMinWeight() const {
std::lock_guard<std::mutex> lock(*machines_weight_edges_mutex);
std::lock_guard<std::mutex> lock(machines_weight_edges_mutex);

double MIN_LOAD = std::numeric_limits<double>::max();
int machine_id = 0;
Expand All @@ -195,7 +222,7 @@ int CoordinatedPartitionState<T>::getMachineWithMinWeight() const {
template <typename T>
int CoordinatedPartitionState<T>::getMachineWithMinWeight(
const std::set<int> &partitions) const {
std::lock_guard<std::mutex> lock(*machines_weight_edges_mutex);
std::lock_guard<std::mutex> lock(machines_weight_edges_mutex);

double MIN_LOAD = std::numeric_limits<double>::max();
int machine_id = 0;
Expand All @@ -211,7 +238,7 @@ int CoordinatedPartitionState<T>::getMachineWithMinWeight(
}
template <typename T>
std::vector<int> CoordinatedPartitionState<T>::getMachines_load() const {
std::lock_guard<std::mutex> lock(*machines_load_edges_mutex);
std::lock_guard<std::mutex> lock(machines_load_edges_mutex);
std::vector<int> result;
for (const auto &machines_load_edges_it : machines_load_edges) {
result.push_back(machines_load_edges_it);
Expand All @@ -221,7 +248,7 @@ std::vector<int> CoordinatedPartitionState<T>::getMachines_load() const {
template <typename T>
size_t CoordinatedPartitionState<T>::getTotalReplicas() const {
// TODO
std::lock_guard<std::mutex> lock(*record_map_mutex);
std::lock_guard<std::mutex> lock(record_map_mutex);
size_t result = 0;
for (const auto &record_map_it : record_map) {
size_t r = record_map_it.second->getReplicas();
Expand All @@ -235,12 +262,12 @@ size_t CoordinatedPartitionState<T>::getTotalReplicas() const {
}
template <typename T>
size_t CoordinatedPartitionState<T>::getNumVertices() const {
std::lock_guard<std::mutex> lock(*record_map_mutex);
std::lock_guard<std::mutex> lock(record_map_mutex);
return (size_t)record_map.size();
}
template <typename T>
std::set<CXXGraph::id_t> CoordinatedPartitionState<T>::getVertexIds() const {
std::lock_guard<std::mutex> lock(*record_map_mutex);
std::lock_guard<std::mutex> lock(record_map_mutex);
// if (GLOBALS.OUTPUT_FILE_NAME!=null){ out.close(); }
std::set<CXXGraph::id_t> result;
for (const auto &record_map_it : record_map) {
Expand All @@ -250,13 +277,13 @@ std::set<CXXGraph::id_t> CoordinatedPartitionState<T>::getVertexIds() const {
}
template <typename T>
void CoordinatedPartitionState<T>::incrementMachineLoadVertices(const int m) {
std::lock_guard<std::mutex> lock(*machines_load_vertices_mutex);
std::lock_guard<std::mutex> lock(machines_load_vertices_mutex);
machines_load_vertices[m] = machines_load_vertices[m] + 1;
}
template <typename T>
std::vector<int> CoordinatedPartitionState<T>::getMachines_loadVertices()
const {
std::lock_guard<std::mutex> lock(*machines_load_vertices_mutex);
std::lock_guard<std::mutex> lock(machines_load_vertices_mutex);
std::vector<int> result;
for (const auto &machines_load_vertices_it : machines_load_vertices) {
result.push_back(machines_load_vertices_it);
Expand Down
33 changes: 12 additions & 21 deletions include/CXXGraph/Partitioning/Partitioner.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
/*** License: MPL v2.0 ***/
/***********************************************************/

#ifndef __CXXGRAPH_PARTITIONING_PARTITIONER_H__
#define __CXXGRAPH_PARTITIONING_PARTITIONER_H__
#ifndef CXXGRAPH_PARTITIONING_PARTITIONER_H_
#define CXXGRAPH_PARTITIONING_PARTITIONER_H_

#include <memory>
#pragma once
#include <memory>
#include <vector>

#include "CXXGraph/Edge/Edge.h"
Expand Down Expand Up @@ -170,6 +170,7 @@ Partitioner<T>::Partitioner(const Partitioner &other) {
template <typename T>
CoordinatedPartitionState<T> Partitioner<T>::startCoordinated() {
CoordinatedPartitionState<T> state(GLOBALS);
auto shared_state = make_shared<CoordinatedPartitionState<T>>(std::move(state));
int processors = GLOBALS.threads;

std::vector<std::thread> myThreads(processors);
Expand All @@ -184,28 +185,18 @@ CoordinatedPartitionState<T> Partitioner<T>::startCoordinated() {
list_vector[t] = std::vector<shared<const Edge<T>>>(
std::next(dataset->begin(), iStart),
std::next(dataset->begin(), iEnd));
myRunnable[t].reset(new PartitionerThread<T>(
list_vector[t], make_shared<CoordinatedPartitionState<T>>(state),
algorithm));
myRunnable[t].reset(
new PartitionerThread<T>(list_vector[t], shared_state, algorithm));
myThreads[t] = std::thread(&Runnable::run, myRunnable[t].get());
}
}
for (int t = 0; t < processors; ++t) {
if (myThreads[t].joinable()) {
myThreads[t].join();
}
// if(myRunnable[t] != nullptr){
// delete myRunnable[t];
//}
myThreads[t].join();
}
/*
for (int t = 0; t < processors; ++t){
if (runnableList[t] != nullptr){
delete runnableList[t];
}
}
*/
return state;

// new shared state is move constructed then returned
// (so has no effect on the shared ponter)
return std::move(*shared_state);
}

template <typename T>
Expand Down Expand Up @@ -235,4 +226,4 @@ PartitionMap<T> Partitioner<T>::partitionGraph(
} // namespace Partitioning
} // namespace CXXGraph

#endif // __CXXGRAPH_PARTITIONING_PARTITIONER_H__
#endif // CXXGRAPH_PARTITIONING_PARTITIONER_H_
Loading