Skip to content

Margo-based implementation #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,18 @@ endif ()
set (CMAKE_PREFIX_PATH "" CACHE STRING "External dependencies path")
set (BUILD_SHARED_LIBS "OFF" CACHE BOOL "Build a shared library")

option (ENABLE_MARGO "Build with Margo support" OFF)

# Find required packages
find_package (mercury CONFIG REQUIRED)

if (${ENABLE_MARGO})
find_package (PkgConfig REQUIRED)
pkg_check_modules (margo REQUIRED IMPORTED_TARGET margo)
set (EXTRA_LIBS PkgConfig::margo)
else ()
set (EXTRA_LIBS)
endif ()

add_subdirectory (src)
add_subdirectory (tests)
31 changes: 31 additions & 0 deletions include/mercury-progressor/mercury-progressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

typedef struct progressor_handle progressor_handle_t;

typedef struct margo_instance* margo_instance_id; /* avoid including margo.h if not present */

/*
* progressor_stats: run stats for a progressor and its handle.
* note that the stats are zeroed each time the threads are started.
Expand Down Expand Up @@ -74,6 +76,22 @@ extern "C" {
progressor_handle_t *mercury_progressor_init(hg_class_t *hgclass,
hg_context_t *hgcontext);

/**
* mercury_progressor_init_from_margo: allocate and init a mercury
* progressor and return a handle to it. The progressor will be
* created from a margo instance. Contrary to mercury_progressor_init,
* which does nto start the progress thread, the margo instance will
* already have a running progress thread.
*
* If mercury-progressor has not been built with Margo support,
* this function will fail and return NULL.
*
* @param mid margo instance to associate with the processor
*
* @return handle to progressor, or NULL (for allocation error)
*/
progressor_handle_t *mercury_progressor_init_from_margo(margo_instance_id mid);

/*
* mercury_progressor_get_progtimeout: get the current timeout setting
* that is used with HG_Progress() in the timeout loop.
Expand Down Expand Up @@ -178,6 +196,19 @@ hg_class_t *mercury_progressor_hgclass(progressor_handle_t *hand);
*/
hg_context_t *mercury_progressor_hgcontext(progressor_handle_t *hand);

/*
* mercury_progressor_hgclass: return margo_instance_id associated with
* the progressor our handle points to. clients can safely cache
* the return value while the progressor is allocated.
*
* If mercury-progressor was not build with Margo support, this function
* will return NULL.
*
* @param hand the progressor handle of interest
* @return the margo_instance_id
*/
margo_instance_id mercury_progressor_mid(progressor_handle_t *hand);

/*
* mercury_progressor_addrstring: return pointer to a C string containing
* the address of our local mercury address. valid as long as the
Expand Down
20 changes: 20 additions & 0 deletions spack.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# This spack.yaml file can be used to create a spack
# environment for testing purpose. From inside This
# directorty:
#
# ```
# $ spack env create -d .
# $ spack env activate .
# $ spack install
# ```
spack:
specs:
- cmake
- mercury~boostsys ^libfabric fabrics=tcp,rxm
- mochi-margo@main
concretizer:
unify: true
modules:
prefix_inspections:
lib: [LD_LIBRARY_PATH]
lib64: [LD_LIBRARY_PATH]
14 changes: 12 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,24 @@ set (MERCURY_PROGRESSOR_VERSION
# create library target (user can specify shared vs. static using
# BUILD_SHARED_LIBS). arrange for users of our lib to get the correct "-I"'s
#
add_library (mercury-progressor mercury-progressor.c)
if (${ENABLE_MARGO})
add_library (mercury-progressor margo-progressor.c)
else ()
add_library (mercury-progressor mercury-progressor.c)
endif ()
target_include_directories (mercury-progressor
PUBLIC $<INSTALL_INTERFACE:include>)

# configure config.h
configure_file ("config.h.in" "config.h" @ONLY)
# add the build directory as include path
target_include_directories (mercury-progressor BEFORE PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_BINARY_DIR}>)

# make sure our build includes are BEFORE a previously installed version
target_include_directories (mercury-progressor BEFORE PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/../include/mercury-progressor>)
target_link_libraries (mercury-progressor mercury)
target_link_libraries (mercury-progressor mercury ${EXTRA_LIBS})
# XXX: cmake 3.1 and newer define a Threads::Threads imported target
# that we should switch too when we are ready to require 3.1 or better.
# (3.1 was released late 2014)
Expand Down
6 changes: 6 additions & 0 deletions src/config.h.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#ifndef _CONFIG_H
#define _CONFIG_H

#cmakedefine ENABLE_MARGO

#endif
228 changes: 228 additions & 0 deletions src/margo-progressor.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Copyright (c) 2019-2023, Carnegie Mellon University.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the University nor the names of its contributors
* may be used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY
* WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

/*
* margo-progressor.c mercury progressor component implemented with margo
* 13-Jan-2023 [email protected]
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#include <margo.h>
#include <margo-util.h>

#include "mercury-progressor.h"

struct margo_progressor {
margo_instance_id mid;
_Atomic unsigned refcount;
int owns_mid;
char self_addr[128];
};

struct progressor_handle {
struct margo_progressor* pg;
_Atomic unsigned needed;
};

/*
* API functions!
*/

progressor_handle_t *mercury_progressor_init_from_margo(margo_instance_id mid) {
if(!mid) return NULL;

hg_addr_t addr = HG_ADDR_NULL;
hg_return_t ret = margo_addr_self(mid, &addr);
if(ret != HG_SUCCESS) return NULL;
char self_addr[128];
hg_size_t addr_size = 128;
ret = margo_addr_to_string(mid, self_addr, &addr_size, addr);
if(ret != HG_SUCCESS) return NULL;
ret = margo_addr_free(mid, addr);
if(ret != HG_SUCCESS) return NULL;

progressor_handle_t* pgh = calloc(1, sizeof(*pgh));
pgh->pg = calloc(1, sizeof(*(pgh->pg)));
pgh->pg->mid = mid;
pgh->pg->refcount = 1;
strcpy(pgh->pg->self_addr, self_addr);

return pgh;
}

margo_instance_id mercury_progressor_mid(progressor_handle_t *hand) {
if(!hand) return NULL;
return hand->pg->mid;
}

/*
* mercury_progressor_init: allocate and init progressor
*/
progressor_handle_t *mercury_progressor_init(hg_class_t *hgclass,
hg_context_t *hgcontext) {
struct margo_init_info init_info = {0};
init_info.hg_class = hgclass;
init_info.hg_context = hgcontext;
init_info.json_config = "{\"use_progress_thread\":true,\"rpc_thread_count\":1}";
margo_instance_id mid = margo_init_ext(
HG_Class_get_protocol(hgclass),
HG_Class_is_listening(hgclass),
&init_info);
if(!mid) return NULL;
progressor_handle_t* pgh = mercury_progressor_init_from_margo(mid);
if(!pgh) return NULL;
pgh->pg->owns_mid = 1;
return pgh;
}

/*
* mercury_progressor_get_progtimeout: get progtimeout (msec)
*/
unsigned int mercury_progressor_get_progtimeout(progressor_handle_t *hand) {
if(!hand) return 0;
unsigned int timeout = 0;
margo_get_progress_timeout_ub_msec(hand->pg->mid, &timeout);
return timeout;
}

/*
* mercury_progressor_set_progtimeout: set progtimeout (msec)
*/
void mercury_progressor_set_progtimeout(progressor_handle_t *hand,
unsigned int timeout) {
if(!hand) return;
margo_set_progress_timeout_ub_msec(hand->pg->mid, timeout);
}

/*
* mercury_progressor_duphandle: create a duplicate handle.
*/
progressor_handle_t *mercury_progressor_duphandle(progressor_handle_t *hand) {
if(!hand) return NULL;
progressor_handle_t* new_pgh = calloc(1, sizeof(*new_pgh));
new_pgh->pg = hand->pg;
new_pgh->pg->refcount++;
return new_pgh;
}

/*
* mercury_progressor_freehandle: free a progressor handle. the
* caller should be holding the last reference to the handle.
*/
hg_return_t mercury_progressor_freehandle(progressor_handle_t *hand) {
if(!hand) return HG_SUCCESS;
if(--hand->pg->refcount == 0) {
if(hand->pg->owns_mid)
margo_finalize(hand->pg->mid);
free(hand->pg);
}
free(hand);
return HG_SUCCESS;
}

/*
* mercury_progressor_getstats: get stats from progress thread
*/
hg_return_t mercury_progressor_getstats(progressor_handle_t *hand,
struct progressor_stats *ps) {
if(!hand) return HG_INVALID_ARG;
if(!ps) return HG_SUCCESS;

margo_instance_id mid = hand->pg->mid;

memset(ps, 0, sizeof(*ps));
ps->is_running = 1; /* margo is always running */
ps->needed = hand->needed;
ps->progressor_refcnt = hand->pg->refcount;
ps->nprogress = margo_get_num_progress_calls(mid);
ps->ntrigger = margo_get_num_trigger_calls(mid);
// note: can't fill in runtime and runusage

return HG_SUCCESS;
}

/*
* mercury_progressor_nprogress: extract nprogress from progressor
*/
uint64_t mercury_progressor_nprogress(progressor_handle_t *hand) {
return hand ? margo_get_num_progress_calls(hand->pg->mid) : 0;
}

/*
* mercury_progressor_ntrigger: extract ntrigger from progressor
*/
uint64_t mercury_progressor_ntrigger(progressor_handle_t *hand) {
return hand ? margo_get_num_trigger_calls(hand->pg->mid) : 0;
}

/*
* mercury_progressor_hgclass: extract hg_class_t* from progressor
*/
hg_class_t *mercury_progressor_hgclass(progressor_handle_t *hand) {
return hand ? margo_get_class(hand->pg->mid) : NULL;
}

/*
* mercury_progressor_hgcontext: extract hg_context_t* from progressor
*/
hg_context_t *mercury_progressor_hgcontext(progressor_handle_t *hand) {
return hand ? margo_get_context(hand->pg->mid) : NULL;
}

/*
* mercury_progressor_addrstring: extract local address string.
* valid as long as handle is active, no need to free return value.
*/
char *mercury_progressor_addrstring(progressor_handle_t *hand) {
return hand ? hand->pg->self_addr : NULL;
}

/*
* mercury_progressor_needed: handle needs threads to be running
*/
hg_return_t mercury_progressor_needed(progressor_handle_t *hand) {
if(!hand) return HG_INVALID_ARG;
++hand->needed;
return HG_SUCCESS;
}

/*
* mercury_progressor_idle: progressor handle no longer needs progressor
* service running.
*/
hg_return_t mercury_progressor_idle(progressor_handle_t *hand) {
if(!hand) return HG_INVALID_ARG;
--hand->needed;
return HG_SUCCESS;
}
7 changes: 7 additions & 0 deletions src/mercury-progressor-config.cmake.in
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,11 @@ find_dependency(mercury)

include ("${CMAKE_CURRENT_LIST_DIR}/mercury-progressor-targets.cmake")

set (MERCURY_PROGRESSOR_USES_MARGO @ENABLE_MARGO@)

if (MERCURY_PROGRESSOR_USES_MARGO)
find_dependency(PkgConfig)
pkg_check_modules (margo REQUIRED IMPORTED_TARGET margo)
endif ()

# could include a macros file if one is used
10 changes: 10 additions & 0 deletions src/mercury-progressor.c
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,16 @@ static void load_stats(struct progressor_stats *ps, struct progressor *pg) {
* API functions!
*/

progressor_handle_t *mercury_progressor_init_from_margo(margo_instance_id mid) {
(void)mid;
return NULL;
}

margo_instance_id mercury_progressor_mid(progressor_handle_t *hand) {
(void)hand;
return NULL;
}

/*
* mercury_progressor_init: allocate and init progressor
*/
Expand Down
12 changes: 9 additions & 3 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
# 29-Oct-2019 [email protected]
#

add_executable(test-progressor test-progressor.c)
target_link_libraries(test-progressor mercury-progressor)
add_test(test-progressor test-progressor)
if (${ENABLE_MARGO})
add_executable (test-margo-progressor test-margo-progressor.c)
target_link_libraries (test-margo-progressor mercury-progressor)
add_test (test-margo-progressor test-margo-progressor)
endif ()

add_executable (test-progressor test-progressor.c)
target_link_libraries (test-progressor mercury-progressor)
add_test (test-progressor test-progressor)
Loading