From 73d94b863ef556baddc3bf2e529c960b40b1a5ab Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Fri, 13 Jan 2023 11:23:56 +0000 Subject: [PATCH 1/6] implemented progressor based on margo --- CMakeLists.txt | 10 + .../mercury-progressor/mercury-progressor.h | 31 +++ spack.yaml | 20 ++ src/CMakeLists.txt | 14 +- src/config.h.in | 6 + src/margo-progressor.c | 226 ++++++++++++++++++ src/mercury-progressor-config.cmake.in | 7 + src/mercury-progressor.c | 10 + tests/CMakeLists.txt | 12 +- tests/test-margo-progressor.c | 153 ++++++++++++ tests/test-progressor.c | 8 + 11 files changed, 492 insertions(+), 5 deletions(-) create mode 100644 spack.yaml create mode 100644 src/config.h.in create mode 100644 src/margo-progressor.c create mode 100644 tests/test-margo-progressor.c diff --git a/CMakeLists.txt b/CMakeLists.txt index cd0b8f1..840de06 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -39,8 +39,18 @@ endif () set (CMAKE_PREFIX_PATH "" CACHE STRING "External dependencies path") set (BUILD_SHARED_LIBS "OFF" CACHE BOOL "Build a shared library") +option (ENABLE_MARGO "Build with Margo support" OFF) + # Find required packages find_package (mercury CONFIG REQUIRED) +if (${ENABLE_MARGO}) + find_package (PkgConfig REQUIRED) + pkg_check_modules (margo REQUIRED IMPORTED_TARGET margo) + set (EXTRA_LIBS PkgConfig::margo) +else () + set (EXTRA_LIBS) +endif () + add_subdirectory (src) add_subdirectory (tests) diff --git a/include/mercury-progressor/mercury-progressor.h b/include/mercury-progressor/mercury-progressor.h index 47f0ba4..1278052 100644 --- a/include/mercury-progressor/mercury-progressor.h +++ b/include/mercury-progressor/mercury-progressor.h @@ -40,6 +40,8 @@ typedef struct progressor_handle progressor_handle_t; +typedef struct margo_instance* margo_instance_id; /* avoid including margo.h if not present */ + /* * progressor_stats: run stats for a progressor and its handle. * note that the stats are zeroed each time the threads are started. @@ -74,6 +76,22 @@ extern "C" { progressor_handle_t *mercury_progressor_init(hg_class_t *hgclass, hg_context_t *hgcontext); +/** + * mercury_progressor_init_from_margo: allocate and init a mercury + * progressor and return a handle to it. The progressor will be + * created from a margo instance. Contrary to mercury_progressor_init, + * which does nto start the progress thread, the margo instance will + * already have a running progress thread. + * + * If mercury-progressor has not been built with Margo support, + * this function will fail and return NULL. + * + * @param mid margo instance to associate with the processor + * + * @return handle to progressor, or NULL (for allocation error) + */ +progressor_handle_t *mercury_progressor_init_from_margo(margo_instance_id mid); + /* * mercury_progressor_get_progtimeout: get the current timeout setting * that is used with HG_Progress() in the timeout loop. @@ -178,6 +196,19 @@ hg_class_t *mercury_progressor_hgclass(progressor_handle_t *hand); */ hg_context_t *mercury_progressor_hgcontext(progressor_handle_t *hand); +/* + * mercury_progressor_hgclass: return margo_instance_id associated with + * the progressor our handle points to. clients can safely cache + * the return value while the progressor is allocated. + * + * If mercury-progressor was not build with Margo support, this function + * will return NULL. + * + * @param hand the progressor handle of interest + * @return the margo_instance_id + */ +margo_instance_id mercury_progressor_mid(progressor_handle_t *hand); + /* * mercury_progressor_addrstring: return pointer to a C string containing * the address of our local mercury address. valid as long as the diff --git a/spack.yaml b/spack.yaml new file mode 100644 index 0000000..b556526 --- /dev/null +++ b/spack.yaml @@ -0,0 +1,20 @@ +# This spack.yaml file can be used to create a spack +# environment for testing purpose. From inside This +# directorty: +# +# ``` +# $ spack env create -d . +# $ spack env activate . +# $ spack install +# ``` +spack: + specs: + - cmake + - mercury~boostsys ^libfabric fabrics=tcp,rxm + - mochi-margo@main + concretizer: + unify: true + modules: + prefix_inspections: + lib: [LD_LIBRARY_PATH] + lib64: [LD_LIBRARY_PATH] diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 7153940..eb2f16f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -40,14 +40,24 @@ set (MERCURY_PROGRESSOR_VERSION # create library target (user can specify shared vs. static using # BUILD_SHARED_LIBS). arrange for users of our lib to get the correct "-I"'s # -add_library (mercury-progressor mercury-progressor.c) +if (${ENABLE_MARGO}) + add_library (mercury-progressor margo-progressor.c) +else () + add_library (mercury-progressor mercury-progressor.c) +endif () target_include_directories (mercury-progressor PUBLIC $) +# configure config.h +configure_file ("config.h.in" "config.h" @ONLY) +# add the build directory as include path +target_include_directories (mercury-progressor BEFORE PUBLIC + $) + # make sure our build includes are BEFORE a previously installed version target_include_directories (mercury-progressor BEFORE PUBLIC $) -target_link_libraries (mercury-progressor mercury) +target_link_libraries (mercury-progressor mercury ${EXTRA_LIBS}) # XXX: cmake 3.1 and newer define a Threads::Threads imported target # that we should switch too when we are ready to require 3.1 or better. # (3.1 was released late 2014) diff --git a/src/config.h.in b/src/config.h.in new file mode 100644 index 0000000..c40e7e2 --- /dev/null +++ b/src/config.h.in @@ -0,0 +1,6 @@ +#ifndef _CONFIG_H +#define _CONFIG_H + +#cmakedefine ENABLE_MARGO + +#endif diff --git a/src/margo-progressor.c b/src/margo-progressor.c new file mode 100644 index 0000000..e8112e3 --- /dev/null +++ b/src/margo-progressor.c @@ -0,0 +1,226 @@ +/* + * Copyright (c) 2019-2023, Carnegie Mellon University. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the University nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS + * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY + * WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +/* + * margo-progressor.c mercury progressor component implemented with margo + * 13-Jan-2023 mdorier@anl.gov + */ + +#include +#include +#include +#include + +#include +#include + +#include "mercury-progressor.h" + +struct margo_progressor { + margo_instance_id mid; + _Atomic unsigned refcount; + int owns_mid; + char self_addr[128]; +}; + +struct progressor_handle { + struct margo_progressor* pg; + _Atomic unsigned needed; +}; + +/* + * API functions! + */ + +progressor_handle_t *mercury_progressor_init_from_margo(margo_instance_id mid) { + if(!mid) return NULL; + + hg_addr_t addr = HG_ADDR_NULL; + hg_return_t ret = margo_addr_self(mid, &addr); + if(ret != HG_SUCCESS) return NULL; + char self_addr[128]; + hg_size_t addr_size = 128; + ret = margo_addr_to_string(mid, self_addr, &addr_size, addr); + if(ret != HG_SUCCESS) return NULL; + + progressor_handle_t* pgh = calloc(1, sizeof(*pgh)); + pgh->pg = calloc(1, sizeof(*(pgh->pg))); + pgh->pg->mid = mid; + pgh->pg->refcount = 1; + strcpy(pgh->pg->self_addr, self_addr); + + return pgh; +} + +margo_instance_id mercury_progressor_mid(progressor_handle_t *hand) { + if(!hand) return NULL; + return hand->pg->mid; +} + +/* + * mercury_progressor_init: allocate and init progressor + */ +progressor_handle_t *mercury_progressor_init(hg_class_t *hgclass, + hg_context_t *hgcontext) { + struct margo_init_info init_info = {0}; + init_info.hg_class = hgclass; + init_info.hg_context = hgcontext; + init_info.json_config = "{\"use_progress_thread\":true,\"rpc_thread_count\":1}"; + margo_instance_id mid = margo_init_ext( + HG_Class_get_protocol(hgclass), + HG_Class_is_listening(hgclass), + &init_info); + if(!mid) return NULL; + progressor_handle_t* pgh = mercury_progressor_init_from_margo(mid); + if(!pgh) return NULL; + pgh->pg->owns_mid = 1; + return pgh; +} + +/* + * mercury_progressor_get_progtimeout: get progtimeout (msec) + */ +unsigned int mercury_progressor_get_progtimeout(progressor_handle_t *hand) { + if(!hand) return 0; + unsigned int timeout = 0; + margo_get_progress_timeout_ub_msec(hand->pg->mid, &timeout); + return timeout; +} + +/* + * mercury_progressor_set_progtimeout: set progtimeout (msec) + */ +void mercury_progressor_set_progtimeout(progressor_handle_t *hand, + unsigned int timeout) { + if(!hand) return; + margo_set_progress_timeout_ub_msec(hand->pg->mid, timeout); +} + +/* + * mercury_progressor_duphandle: create a duplicate handle. + */ +progressor_handle_t *mercury_progressor_duphandle(progressor_handle_t *hand) { + if(!hand) return NULL; + progressor_handle_t* new_pgh = calloc(1, sizeof(*new_pgh)); + new_pgh->pg = hand->pg; + new_pgh->pg->refcount++; + return new_pgh; +} + +/* + * mercury_progressor_freehandle: free a progressor handle. the + * caller should be holding the last reference to the handle. + */ +hg_return_t mercury_progressor_freehandle(progressor_handle_t *hand) { + if(!hand) return HG_SUCCESS; + if(--hand->pg->refcount == 0) { + if(hand->pg->owns_mid) + margo_finalize(hand->pg->mid); + free(hand->pg); + } + free(hand); + return HG_SUCCESS; +} + +/* + * mercury_progressor_getstats: get stats from progress thread + */ +hg_return_t mercury_progressor_getstats(progressor_handle_t *hand, + struct progressor_stats *ps) { + if(!hand) return HG_INVALID_ARG; + if(!ps) return HG_SUCCESS; + + margo_instance_id mid = hand->pg->mid; + + memset(ps, 0, sizeof(*ps)); + ps->is_running = 1; /* margo is always running */ + ps->needed = hand->needed; + ps->progressor_refcnt = hand->pg->refcount; + ps->nprogress = margo_get_num_progress_calls(mid); + ps->ntrigger = margo_get_num_trigger_calls(mid); + // note: can't fill in runtime and runusage + + return HG_SUCCESS; +} + +/* + * mercury_progressor_nprogress: extract nprogress from progressor + */ +uint64_t mercury_progressor_nprogress(progressor_handle_t *hand) { + return hand ? margo_get_num_progress_calls(hand->pg->mid) : 0; +} + +/* + * mercury_progressor_ntrigger: extract ntrigger from progressor + */ +uint64_t mercury_progressor_ntrigger(progressor_handle_t *hand) { + return hand ? margo_get_num_trigger_calls(hand->pg->mid) : 0; +} + +/* + * mercury_progressor_hgclass: extract hg_class_t* from progressor + */ +hg_class_t *mercury_progressor_hgclass(progressor_handle_t *hand) { + return hand ? margo_get_class(hand->pg->mid) : NULL; +} + +/* + * mercury_progressor_hgcontext: extract hg_context_t* from progressor + */ +hg_context_t *mercury_progressor_hgcontext(progressor_handle_t *hand) { + return hand ? margo_get_context(hand->pg->mid) : NULL; +} + +/* + * mercury_progressor_addrstring: extract local address string. + * valid as long as handle is active, no need to free return value. + */ +char *mercury_progressor_addrstring(progressor_handle_t *hand) { + return hand ? hand->pg->self_addr : NULL; +} + +/* + * mercury_progressor_needed: handle needs threads to be running + */ +hg_return_t mercury_progressor_needed(progressor_handle_t *hand) { + if(!hand) return HG_INVALID_ARG; + ++hand->needed; + return HG_SUCCESS; +} + +/* + * mercury_progressor_idle: progressor handle no longer needs progressor + * service running. + */ +hg_return_t mercury_progressor_idle(progressor_handle_t *hand) { + if(!hand) return HG_INVALID_ARG; + --hand->needed; + return HG_SUCCESS; +} diff --git a/src/mercury-progressor-config.cmake.in b/src/mercury-progressor-config.cmake.in index 9db734a..50993bb 100644 --- a/src/mercury-progressor-config.cmake.in +++ b/src/mercury-progressor-config.cmake.in @@ -8,4 +8,11 @@ find_dependency(mercury) include ("${CMAKE_CURRENT_LIST_DIR}/mercury-progressor-targets.cmake") +set (MERCURY_PROGRESSOR_USES_MARGO @ENABLE_MARGO@) + +if (MERCURY_PROGRESSOR_USES_MARGO) + find_dependency(PkgConfig) + pkg_check_modules (margo REQUIRED IMPORTED_TARGET margo) +endif () + # could include a macros file if one is used diff --git a/src/mercury-progressor.c b/src/mercury-progressor.c index 42d1f48..8dd6307 100644 --- a/src/mercury-progressor.c +++ b/src/mercury-progressor.c @@ -435,6 +435,16 @@ static void load_stats(struct progressor_stats *ps, struct progressor *pg) { * API functions! */ +progressor_handle_t *mercury_progressor_init_from_margo(margo_instance_id mid) { + (void)mid; + return NULL; +} + +margo_instance_id mercury_progressor_mid(progressor_handle_t *hand) { + (void)hand; + return NULL; +} + /* * mercury_progressor_init: allocate and init progressor */ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 4d1448c..eaab6cf 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -12,6 +12,12 @@ # 29-Oct-2019 chuck@ece.cmu.edu # -add_executable(test-progressor test-progressor.c) -target_link_libraries(test-progressor mercury-progressor) -add_test(test-progressor test-progressor) +if (${ENABLE_MARGO}) + add_executable (test-margo-progressor test-margo-progressor.c) + target_link_libraries (test-margo-progressor mercury-progressor) + add_test (test-margo-progressor test-margo-progressor) +endif () + +add_executable (test-progressor test-progressor.c) +target_link_libraries (test-progressor mercury-progressor) +add_test (test-progressor test-progressor) diff --git a/tests/test-margo-progressor.c b/tests/test-margo-progressor.c new file mode 100644 index 0000000..8f75f3b --- /dev/null +++ b/tests/test-margo-progressor.c @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2019, Carnegie Mellon University. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the University nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS + * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY + * WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include + +#include +#include +#include "mercury-progressor.h" + +void checkstat(char *tag, progressor_handle_t *p, + struct progressor_stats *psp, int rn, int nd, int rf) { + if (mercury_progressor_getstats(p, psp) != HG_SUCCESS) { + fprintf(stderr, "progressor stat %s failed\n", tag); + exit(1); + } + if (psp->needed != nd) { + fprintf(stderr, "progressor stat %s need check failed\n", tag); + exit(1); + } + if (psp->progressor_refcnt != rf) { + fprintf(stderr, "progressor stat %s ref check failed\n", tag); + exit(1); + } +} + +int main(int argc, char **argv) { + margo_instance_id mid; + hg_class_t *cls; + hg_context_t *ctx; + progressor_handle_t *phand, *duphand; + struct progressor_stats ps; + + mid = margo_init("na+sm", MARGO_SERVER_MODE, 1, 1); + if (!mid) { + fprintf(stderr, "margo_init failed\n"); + exit(1); + } + + phand = mercury_progressor_init_from_margo(mid); + + if (!phand) { + fprintf(stderr, "mercury_progressor_init_from_margo failed\n"); + exit(1); + } + + cls = mercury_progressor_hgclass(phand); + ctx = mercury_progressor_hgcontext(phand); + printf("my address: %s\n", mercury_progressor_addrstring(phand)); + + checkstat("check 0", phand, &ps, 0, 0, 1); + + duphand = mercury_progressor_duphandle(phand); + if (!duphand) { + fprintf(stderr, "progressor dup failed\n"); + exit(1); + } + + checkstat("dup check A0", phand, &ps, 0, 0, 2); + checkstat("dup check A1", duphand, &ps, 0, 0, 2); + + if (mercury_progressor_needed(duphand) != HG_SUCCESS) { + fprintf(stderr, "needed start failed\n"); + exit(1); + } + checkstat("dup check B0", phand, &ps, 1, 0, 2); + checkstat("dup check B1", duphand, &ps, 1, 1, 2); + + if (mercury_progressor_needed(duphand) != HG_SUCCESS) { + fprintf(stderr, "needed start failed\n"); + exit(1); + } + checkstat("dup check C0", phand, &ps, 1, 0, 2); + checkstat("dup check C1", duphand, &ps, 1, 2, 2); + + if (mercury_progressor_needed(phand) != HG_SUCCESS) { + fprintf(stderr, "needed start failed\n"); + exit(1); + } + checkstat("dup check D0", phand, &ps, 1, 1, 2); + checkstat("dup check D1", duphand, &ps, 1, 2, 2); + + if (mercury_progressor_idle(duphand) != HG_SUCCESS) { + fprintf(stderr, "idle failed\n"); + exit(1); + } + checkstat("dup check E0", phand, &ps, 1, 1, 2); + checkstat("dup check E1", duphand, &ps, 1, 1, 2); + + if (mercury_progressor_needed(duphand) != HG_SUCCESS) { + fprintf(stderr, "needed start failed\n"); + exit(1); + } + checkstat("dup check F0", phand, &ps, 1, 1, 2); + checkstat("dup check F1", duphand, &ps, 1, 2, 2); + + if (mercury_progressor_freehandle(duphand) != HG_SUCCESS) { + fprintf(stderr, "free dup failed\n"); + exit(1); + } + duphand = NULL; + checkstat("dup check G0", phand, &ps, 1, 1, 1); + + if (mercury_progressor_idle(phand) != HG_SUCCESS) { + fprintf(stderr, "idle failed\n"); + exit(1); + } + checkstat("dup check H0", phand, &ps, 0, 0, 1); + + if (mercury_progressor_needed(phand) != HG_SUCCESS) { + fprintf(stderr, "needed start failed\n"); + exit(1); + } + checkstat("dup check I0", phand, &ps, 1, 1, 1); + + if (mercury_progressor_freehandle(phand) != HG_SUCCESS) { + fprintf(stderr, "free phand failed\n"); + exit(1); + } + phand = NULL; + + margo_finalize(mid); + + exit(0); +} diff --git a/tests/test-progressor.c b/tests/test-progressor.c index 883cfe6..e0ce4d2 100644 --- a/tests/test-progressor.c +++ b/tests/test-progressor.c @@ -34,6 +34,7 @@ #include #include "mercury-progressor.h" +#include "config.h" void checkstat(char *tag, progressor_handle_t *p, struct progressor_stats *psp, int rn, int nd, int rf) { @@ -41,10 +42,17 @@ void checkstat(char *tag, progressor_handle_t *p, fprintf(stderr, "progressor stat %s failed\n", tag); exit(1); } +#ifndef ENABLE_MARGO if (psp->is_running != rn) { fprintf(stderr, "progressor stat %s run check failed\n", tag); exit(1); } +#else + if (psp->is_running != 1) { + fprintf(stderr, "progressor stat %s run check failed\n", tag); + exit(1); + } +#endif if (psp->needed != nd) { fprintf(stderr, "progressor stat %s need check failed\n", tag); exit(1); From 9032078994556b02bfaa6696e12a371b2ec58be0 Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Fri, 13 Jan 2023 11:31:18 +0000 Subject: [PATCH 2/6] fixing memory leak --- src/margo-progressor.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/margo-progressor.c b/src/margo-progressor.c index e8112e3..51152e8 100644 --- a/src/margo-progressor.c +++ b/src/margo-progressor.c @@ -69,6 +69,8 @@ progressor_handle_t *mercury_progressor_init_from_margo(margo_instance_id mid) { hg_size_t addr_size = 128; ret = margo_addr_to_string(mid, self_addr, &addr_size, addr); if(ret != HG_SUCCESS) return NULL; + ret = margo_addr_free(mid, addr); + if(ret != HG_SUCCESS) return NULL; progressor_handle_t* pgh = calloc(1, sizeof(*pgh)); pgh->pg = calloc(1, sizeof(*(pgh->pg))); From 6dce6c220884a21db5acca705ad8d9e289138492 Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Fri, 13 Jan 2023 11:45:23 +0000 Subject: [PATCH 3/6] added an actual RPC in the test --- tests/test-progressor.c | 107 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/tests/test-progressor.c b/tests/test-progressor.c index e0ce4d2..b63ca40 100644 --- a/tests/test-progressor.c +++ b/tests/test-progressor.c @@ -31,8 +31,10 @@ #include #include #include +#include #include +#include #include "mercury-progressor.h" #include "config.h" @@ -63,6 +65,44 @@ void checkstat(char *tag, progressor_handle_t *p, } } +MERCURY_GEN_PROC(sum_in_t, + ((int32_t)(x))\ + ((int32_t)(y))) + +MERCURY_GEN_PROC(sum_out_t, ((int32_t)(ret))) + +hg_return_t sum(hg_handle_t handle) +{ + hg_return_t ret; + sum_in_t in; + sum_out_t out; + + const struct hg_info* info = HG_Get_info(handle); + + ret = HG_Get_input(handle, &in); + assert(ret == HG_SUCCESS); + + out.ret = in.x + in.y; + printf("%d + %d = %d\n", in.x, in.y, in.x+in.y); + + ret = HG_Respond(handle, NULL, NULL, &out); + assert(ret == HG_SUCCESS); + + ret = HG_Free_input(handle, &in); + assert(ret == HG_SUCCESS); + + ret = HG_Destroy(handle); + assert(ret == HG_SUCCESS); + + return HG_SUCCESS; +} + +hg_return_t sum_completed(const struct hg_cb_info *info) { + volatile int *completed = info->arg; + *completed = 1; + return HG_SUCCESS; +} + int main(int argc, char **argv) { hg_class_t *cls; hg_context_t *ctx; @@ -93,6 +133,73 @@ int main(int argc, char **argv) { } printf("my address: %s\n", mercury_progressor_addrstring(phand)); + // register RPC + hg_id_t rpc_id = MERCURY_REGISTER(cls, "sum", sum_in_t, sum_out_t, sum); + + // get self address + hg_addr_t self_addr = HG_ADDR_NULL; + hg_return_t ret = HG_Addr_self(cls, &self_addr); + if (ret != HG_SUCCESS) { + fprintf(stderr, "HG_Addr_self failed (%d)\n", ret); + exit(1); + } + + // create RPC handle + hg_handle_t handle; + ret = HG_Create(ctx, self_addr, rpc_id, &handle); + if (ret != HG_SUCCESS) { + fprintf(stderr, "HG_Create failed (%d)\n", ret); + exit(1); + } + + // forward RPC + sum_in_t in; + in.x = 42; + in.y = 23; + volatile int completed = 0; + ret = HG_Forward(handle, sum_completed, (void*)&completed, &in); + if (ret != HG_SUCCESS) { + fprintf(stderr, "HG_Forward failed (%d)\n", ret); + exit(1); + } + + // ugly active loop + while(!completed) { usleep(100); } + + // get output + sum_out_t out; + ret = HG_Get_output(handle, &out); + if (ret != HG_SUCCESS) { + fprintf(stderr, "HG_Get_output failed (%d)\n", ret); + exit(1); + } + + if (out.ret != in.x + in.y) { + fprintf(stderr, "Output is incorrect\n"); + exit(1); + } + + // free output + ret = HG_Free_output(handle, &out); + if (ret != HG_SUCCESS) { + fprintf(stderr, "HG_Destroy failed (%d)\n", ret); + exit(1); + } + + // free handle + ret = HG_Destroy(handle); + if (ret != HG_SUCCESS) { + fprintf(stderr, "HG_Destroy failed (%d)\n", ret); + exit(1); + } + + // free self address + ret = HG_Addr_free(cls, self_addr); + if (ret != HG_SUCCESS) { + fprintf(stderr, "HG_Addr_free failed (%d)\n", ret); + exit(1); + } + checkstat("check 0", phand, &ps, 0, 0, 1); duphand = mercury_progressor_duphandle(phand); From acecfbdf7dbac903e591b2da941c0037adca6b6b Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Fri, 13 Jan 2023 11:50:03 +0000 Subject: [PATCH 4/6] some modifications to the mercury test --- tests/test-progressor.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/test-progressor.c b/tests/test-progressor.c index b63ca40..910ac72 100644 --- a/tests/test-progressor.c +++ b/tests/test-progressor.c @@ -65,17 +65,17 @@ void checkstat(char *tag, progressor_handle_t *p, } } -MERCURY_GEN_PROC(sum_in_t, +MERCURY_GEN_PROC(op_in_t, ((int32_t)(x))\ ((int32_t)(y))) -MERCURY_GEN_PROC(sum_out_t, ((int32_t)(ret))) +MERCURY_GEN_PROC(op_out_t, ((int32_t)(ret))) hg_return_t sum(hg_handle_t handle) { hg_return_t ret; - sum_in_t in; - sum_out_t out; + op_in_t in; + op_out_t out; const struct hg_info* info = HG_Get_info(handle); @@ -134,7 +134,7 @@ int main(int argc, char **argv) { printf("my address: %s\n", mercury_progressor_addrstring(phand)); // register RPC - hg_id_t rpc_id = MERCURY_REGISTER(cls, "sum", sum_in_t, sum_out_t, sum); + hg_id_t sum_id = HG_Register_name(cls, "sum", hg_proc_op_in_t, hg_proc_op_out_t, sum); // get self address hg_addr_t self_addr = HG_ADDR_NULL; @@ -146,14 +146,14 @@ int main(int argc, char **argv) { // create RPC handle hg_handle_t handle; - ret = HG_Create(ctx, self_addr, rpc_id, &handle); + ret = HG_Create(ctx, self_addr, sum_id, &handle); if (ret != HG_SUCCESS) { fprintf(stderr, "HG_Create failed (%d)\n", ret); exit(1); } // forward RPC - sum_in_t in; + op_in_t in; in.x = 42; in.y = 23; volatile int completed = 0; @@ -167,7 +167,7 @@ int main(int argc, char **argv) { while(!completed) { usleep(100); } // get output - sum_out_t out; + op_out_t out; ret = HG_Get_output(handle, &out); if (ret != HG_SUCCESS) { fprintf(stderr, "HG_Get_output failed (%d)\n", ret); From 924ea6e1556c2d819e337db22bd3291ffc41ea9d Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Fri, 13 Jan 2023 12:03:06 +0000 Subject: [PATCH 5/6] completed margo test --- tests/test-margo-progressor.c | 217 ++++++++++++++++++++++++++++++++-- 1 file changed, 206 insertions(+), 11 deletions(-) diff --git a/tests/test-margo-progressor.c b/tests/test-margo-progressor.c index 8f75f3b..9ba339d 100644 --- a/tests/test-margo-progressor.c +++ b/tests/test-margo-progressor.c @@ -31,10 +31,13 @@ #include #include #include +#include #include #include +#include #include "mercury-progressor.h" +#include "config.h" void checkstat(char *tag, progressor_handle_t *p, struct progressor_stats *psp, int rn, int nd, int rf) { @@ -42,6 +45,10 @@ void checkstat(char *tag, progressor_handle_t *p, fprintf(stderr, "progressor stat %s failed\n", tag); exit(1); } + if (psp->is_running != 1) { + fprintf(stderr, "progressor stat %s run check failed\n", tag); + exit(1); + } if (psp->needed != nd) { fprintf(stderr, "progressor stat %s need check failed\n", tag); exit(1); @@ -52,30 +59,218 @@ void checkstat(char *tag, progressor_handle_t *p, } } +MERCURY_GEN_PROC(op_in_t, + ((int32_t)(x))\ + ((int32_t)(y))) + +MERCURY_GEN_PROC(op_out_t, ((int32_t)(ret))) + +// sum is a pure-mercury (callback-based) RPC +hg_return_t sum(hg_handle_t handle) +{ + hg_return_t ret; + op_in_t in; + op_out_t out; + + const struct hg_info* info = HG_Get_info(handle); + + ret = HG_Get_input(handle, &in); + assert(ret == HG_SUCCESS); + + out.ret = in.x + in.y; + printf("%d + %d = %d\n", in.x, in.y, out.ret); + + ret = HG_Respond(handle, NULL, NULL, &out); + assert(ret == HG_SUCCESS); + + ret = HG_Free_input(handle, &in); + assert(ret == HG_SUCCESS); + + ret = HG_Destroy(handle); + assert(ret == HG_SUCCESS); + + return HG_SUCCESS; +} + +hg_return_t sum_completed(const struct hg_cb_info *info) { + volatile int *completed = info->arg; + *completed = 1; + return HG_SUCCESS; +} + +// prod is a Margo-based RPC +static void prod(hg_handle_t h) +{ + hg_return_t ret; + + op_in_t in; + op_out_t out; + + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + out.ret = in.x * in.y; + printf("%d * %d = %d\n", in.x, in.y, out.ret); + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); + +} +DEFINE_MARGO_RPC_HANDLER(prod) + int main(int argc, char **argv) { - margo_instance_id mid; hg_class_t *cls; hg_context_t *ctx; progressor_handle_t *phand, *duphand; struct progressor_stats ps; - mid = margo_init("na+sm", MARGO_SERVER_MODE, 1, 1); - if (!mid) { - fprintf(stderr, "margo_init failed\n"); + cls = HG_Init("na+sm", HG_TRUE); + if (!cls) { + fprintf(stderr, "HG_Init failed\n"); exit(1); } - phand = mercury_progressor_init_from_margo(mid); + ctx = HG_Context_create(cls); + if (!ctx) { + fprintf(stderr, "HG_Context_create failed\n"); + exit(1); + } + phand = mercury_progressor_init(cls, ctx); if (!phand) { - fprintf(stderr, "mercury_progressor_init_from_margo failed\n"); + fprintf(stderr, "mercury_progressor_init failed\n"); + exit(1); + } + if (mercury_progressor_hgclass(phand) != cls || + mercury_progressor_hgcontext(phand) != ctx) { + fprintf(stderr, "progressor access check failed\n"); exit(1); } - - cls = mercury_progressor_hgclass(phand); - ctx = mercury_progressor_hgcontext(phand); printf("my address: %s\n", mercury_progressor_addrstring(phand)); + margo_instance_id mid = mercury_progressor_mid(phand); + if (!mid) { + fprintf(stderr, "mercury_progressor_mid failed\n"); + exit(1); + } + + // register Mercury-based RPC + hg_id_t sum_id = HG_Register_name(cls, "sum", hg_proc_op_in_t, hg_proc_op_out_t, sum); + + // register Margo-based RPC + hg_id_t prod_id = MARGO_REGISTER(mid, "prod", op_in_t, op_out_t, prod); + + // get self address + hg_addr_t self_addr = HG_ADDR_NULL; + hg_return_t ret = HG_Addr_self(cls, &self_addr); + if (ret != HG_SUCCESS) { + fprintf(stderr, "HG_Addr_self failed (%d)\n", ret); + exit(1); + } + + // create RPC handle for Mercury-based RPC + hg_handle_t handle1; + ret = HG_Create(ctx, self_addr, sum_id, &handle1); + if (ret != HG_SUCCESS) { + fprintf(stderr, "HG_Create failed (%d)\n", ret); + exit(1); + } + + // create RPC handle for Margo-based RPC + hg_handle_t handle2; + ret = margo_create(mid, self_addr, prod_id, &handle2); + if (ret != HG_SUCCESS) { + fprintf(stderr, "margo_create failed (%d)\n", ret); + exit(1); + } + + // forward Mercury-based RPC + op_in_t in; + in.x = 42; + in.y = 23; + volatile int completed = 0; + ret = HG_Forward(handle1, sum_completed, (void*)&completed, &in); + if (ret != HG_SUCCESS) { + fprintf(stderr, "HG_Forward failed (%d)\n", ret); + exit(1); + } + + // forward Margo-based RPC + ret = margo_forward(handle2, &in); + if (ret != HG_SUCCESS) { + fprintf(stderr, "margo_forward failed (%d)\n", ret); + exit(1); + } + + // ugly active loop for the Mercury-based RPC + while(!completed) { usleep(100); } + + // get output from the Mercury RPC + op_out_t out; + ret = HG_Get_output(handle1, &out); + if (ret != HG_SUCCESS) { + fprintf(stderr, "HG_Get_output failed (%d)\n", ret); + exit(1); + } + + if (out.ret != in.x + in.y) { + fprintf(stderr, "Output is incorrect\n"); + exit(1); + } + + // free output from the Mercury-based RPC + ret = HG_Free_output(handle1, &out); + if (ret != HG_SUCCESS) { + fprintf(stderr, "HG_Free_output failed (%d)\n", ret); + exit(1); + } + + // free handle from the Mercury-based RPC + ret = HG_Destroy(handle1); + if (ret != HG_SUCCESS) { + fprintf(stderr, "HG_Destroy failed (%d)\n", ret); + exit(1); + } + + // get output from the Margo RPC + ret = margo_get_output(handle2, &out); + if (ret != HG_SUCCESS) { + fprintf(stderr, "HG_Get_output failed (%d)\n", ret); + exit(1); + } + + if (out.ret != in.x * in.y) { + fprintf(stderr, "Output is incorrect (margo)\n"); + exit(1); + } + + // free output from the Margo RPC + ret = margo_free_output(handle2, &out); + if (ret != HG_SUCCESS) { + fprintf(stderr, "margo_free_output failed (%d)\n", ret); + exit(1); + } + + // free handle from the Margo RPC + ret = margo_destroy(handle2); + if (ret != HG_SUCCESS) { + fprintf(stderr, "margo_destroy failed (%d)\n", ret); + exit(1); + } + + // free self address + ret = HG_Addr_free(cls, self_addr); + if (ret != HG_SUCCESS) { + fprintf(stderr, "HG_Addr_free failed (%d)\n", ret); + exit(1); + } + checkstat("check 0", phand, &ps, 0, 0, 1); duphand = mercury_progressor_duphandle(phand); @@ -147,7 +342,7 @@ int main(int argc, char **argv) { } phand = NULL; - margo_finalize(mid); - + HG_Context_destroy(ctx); + HG_Finalize(cls); exit(0); } From a730d7a6c6dc077a46f4e95808931e93c3abf63c Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Fri, 13 Jan 2023 12:09:03 +0000 Subject: [PATCH 6/6] fixed test-progressor.c when using mercury instead of margo --- tests/test-progressor.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/test-progressor.c b/tests/test-progressor.c index 910ac72..aedd714 100644 --- a/tests/test-progressor.c +++ b/tests/test-progressor.c @@ -152,6 +152,9 @@ int main(int argc, char **argv) { exit(1); } + // make sure progress loop is runnning + mercury_progressor_needed(phand); + // forward RPC op_in_t in; in.x = 42; @@ -164,7 +167,7 @@ int main(int argc, char **argv) { } // ugly active loop - while(!completed) { usleep(100); } + while(!completed) { usleep(100);} // get output op_out_t out; @@ -200,6 +203,9 @@ int main(int argc, char **argv) { exit(1); } + // stop the progress loop + mercury_progressor_idle(phand); + checkstat("check 0", phand, &ps, 0, 0, 1); duphand = mercury_progressor_duphandle(phand);