diff --git a/CMakeLists.txt b/CMakeLists.txt index e1f9ba682..f7e373d55 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,7 +38,9 @@ set(ANAKIN_FRAMEWORK ${ANAKIN_ROOT}/framework) set(ANAKIN_LITE_FRAMEWORK ${ANAKIN_FRAMEWORK}/lite) set(ANAKIN_UTILS ${ANAKIN_ROOT}/utils) set(ANAKIN_THIRD_PARTY_PATH ${ANAKIN_ROOT}/third-party) +set(ANAKIN_TEMP_THIRD_PARTY_PATH ${CMAKE_BINARY_DIR}/third-party) set(ANAKIN_MODEL_PARSER ${ANAKIN_FRAMEWORK}/model_parser) +set(ANAKIN_SERVICE ${ANAKIN_FRAMEWORK}/service) set(ANAKIN_SABER ${ANAKIN_ROOT}/saber) set(ANAKIN_LITE_SABER ${ANAKIN_SABER}/lite) set(ANAKIN_UNIT_TEST ${ANAKIN_ROOT}/test) @@ -91,7 +93,7 @@ anakin_option(COMPILE_PTX "Returns a list of PTX files generated from src." NO i # common build options -anakin_option(ENABLE_DEBUG "Enable DEBUG(default) mode." NO) +anakin_option(ENABLE_DEBUG "Enable DEBUG(default) mode." YES) anakin_option(ENABLE_VERBOSE_MSG "Enable verbose=1 : compile msg during make." NO) anakin_option(DISABLE_ALL_WARNINGS "Disable all the warning msg during compile." YES) anakin_option(ENABLE_NOISY_WARNINGS "Enable noisy warning msg during compile." NO if DISABLE_ALL_WARNINGS) @@ -116,6 +118,8 @@ anakin_option(BUILD_WITH_UNIT_TEST "Build anakin unit test components." YES) anakin_option(BUILD_LITE "Build anakin lite components." NO) +anakin_option(BUILD_RPC "Build anakin rpc service components." YES) + # build examples anakin_option(BUILD_EXAMPLES "build detection and classification examples" NO) @@ -164,7 +168,6 @@ if(USE_CUDA) endif() if(USE_X86_PLACE) - set(ANAKIN_TEMP_THIRD_PARTY_PATH ${CMAKE_BINARY_DIR}/third-party) if(USE_MKLML) include(cmake/external/mklml.cmake) endif() @@ -185,6 +188,9 @@ include(cmake/gather.cmake) # fetch files of model_parser add_subdirectory(${ANAKIN_MODEL_PARSER}) add_subdirectory(${ANAKIN_SABER}) +if(BUILD_RPC) + add_subdirectory(${ANAKIN_SERVICE}) +endif() add_subdirectory(${ANAKIN_FRAMEWORK}) if(BUILD_WITH_UNIT_TEST) @@ -201,6 +207,3 @@ endif() anakin_print_statistic() - -#set(executable_output_path ${PROJECT_BINARY_DIR}/unit_test) - diff --git a/cmake/compiler_options.cmake b/cmake/compiler_options.cmake index ef5e953c4..00e29a3b6 100644 --- a/cmake/compiler_options.cmake +++ b/cmake/compiler_options.cmake @@ -109,6 +109,7 @@ if(USE_CUDA) anakin_add_compile_option(-G NVCC) anakin_add_compile_option(-g NVCC) anakin_add_compile_option(-std=c++11 NVCC) + anakin_add_compile_option("--default-stream per-thread" NVCC) anakin_add_compile_option(-Wno-deprecated-gpu-targets NVCC) # suppress warning by architectures are deprecated (2.0,2.1) else() anakin_add_compile_option("-Xcompiler -fPIC" NVCC) diff --git a/cmake/cuda.cmake b/cmake/cuda.cmake index 1f1c1154b..c0c6887af 100644 --- a/cmake/cuda.cmake +++ b/cmake/cuda.cmake @@ -78,7 +78,7 @@ endmacro() # ---------------------------------------------------------------------------- macro(anakin_find_cudnn) set(CUDNN_ROOT "" CACHE PATH "CUDNN root dir.") - find_path(CUDNN_INCLUDE_DIR cudnn.h PATHS ${CUDNN_ROOT} ${CUDNN_ROOT}/include + find_path(CUDNN_INCLUDE_DIR cudnn.h PATHS ${CUDNN_ROOT} $ENV{CUDNN_ROOT} $ENV{CUDNN_ROOT}/include $ENV{CUDNN_INCLUDE_DIR} @@ -147,6 +147,9 @@ macro(anakin_find_cuda) if(USE_CURAND) list(APPEND ANAKIN_LINKER_LIBS ${CUDA_curand_LIBRARY}) endif() + if(BUILD_RPC) + list(APPEND ANAKIN_LINKER_LIBS ${CUDA_INCLUDE_DIRS}/../lib64/stubs/libnvidia-ml.so) + endif() list(APPEND ANAKIN_LINKER_LIBS ${CUDA_CUDART_LIBRARY}) else() message(FATAL_ERROR "Cuda SHARED lib Could not found !") diff --git a/cmake/external/mklml.cmake b/cmake/external/mklml.cmake index c522491fc..83a521878 100644 --- a/cmake/external/mklml.cmake +++ b/cmake/external/mklml.cmake @@ -13,10 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. #=============================================================================== -anakin_find_mklml() -if(MKLML_FOUND) - return() -endif() # download mklml package is only for iomp so far include(ExternalProject) diff --git a/cmake/find_modules.cmake b/cmake/find_modules.cmake index aef2b3a77..16c495211 100644 --- a/cmake/find_modules.cmake +++ b/cmake/find_modules.cmake @@ -235,19 +235,18 @@ macro(anakin_find_gtest) endif() endmacro() - -macro(anakin_find_gflags) - set(GFLAGS_INCLUDE_DIR ${ANAKIN_ROOT}/third-party/gflags/include) - find_library(GFLAGS_LIBRARY NAMES libgflags.so - PATHS ${GFLAGS_INCLUDE_DIR}/../lib - DOC "library path for gflags.") - if(GFLAGS_INCLUDE_DIR AND GFLAGS_LIBRARY) - set(GFLAGS_FOUND TRUE) - endif() - if(GFLAGS_FOUND) - message(STATUS "Found gflags in ${GFLAGS_INCLUDE_DIR}") - include_directories(${GFLAGS_INCLUDE_DIR}) - list(APPEND ANAKIN_LINKER_LIBS ${GFLAGS_LIBRARY}) +macro(anakin_find_google_flags) + set(GFLAGS_ROOT_DIR "/usr/include" CACHE PATH "gflags path") + find_path(GFLAGS_INCLUDE_DIRS gflags/gflags.h PATHS ${GFLAGS_ROOT_DIR}) + + find_library(GFLAGS_LIBRARIES NAMES libgflags.so + PATHS ${GFLAGS_ROOT_DIR}/../lib64 + DOC "library path for gflags.") + if(GFLAGS_INCLUDE_DIRS AND GFLAGS_LIBRARIES) + include_directories(SYSTEM ${GFLAGS_INCLUDE_DIRS}) + list(APPEND ANAKIN_DEMO_LIBRARIES ${GFLAGS_LIBRARIES}) + else() + message(SEND_ERROR "Could not found gflags !") endif() endmacro() @@ -328,6 +327,20 @@ macro(anakin_find_protobuf) endif() endmacro() +macro(anakin_find_baidu_rpc) + set(BAIDU_RPC_ROOT "/usr/local/" CACHE PATH "baidu rpc root dir") + find_path(RPC_INCLUDE_DIR server.h PATHS ${BAIDU_RPC_ROOT}/include/brpc/ $ENV{BAIDU_RPC_ROOT}/include/brpc/) + find_library(RPC_LIBRARY NAMES libbrpc.so + PATHS ${BAIDU_RPC_ROOT}/lib $ENV{BAIDU_RPC_ROOT}/include/brpc/ + DOC "library path for baidu rpc.") + if(RPC_INCLUDE_DIR AND RPC_LIBRARY) + include_directories(${BAIDU_RPC_ROOT}/include) + list(APPEND ANAKIN_LINKER_LIBS ${RPC_LIBRARY}) + else() + message(SEND_ERROR "Could not found baidu-rpc !") + endif() +endmacro() + macro(anakin_find_openmp) find_package(OpenMP REQUIRED) if(OPENMP_FOUND OR OpenMP_CXX_FOUND) diff --git a/cmake/gather.cmake b/cmake/gather.cmake index 03a2cac83..ba2721a71 100644 --- a/cmake/gather.cmake +++ b/cmake/gather.cmake @@ -49,8 +49,12 @@ if(USE_PROTOBUF) anakin_protos_processing() endif() +if(BUILD_RPC) + anakin_find_baidu_rpc() +endif() + if (USE_GFLAGS) - anakin_find_gflags() + anakin_find_google_flags() endif() if(USE_MKL) diff --git a/cmake/utils.cmake b/cmake/utils.cmake index 60dc01593..ae78035ac 100644 --- a/cmake/utils.cmake +++ b/cmake/utils.cmake @@ -239,31 +239,26 @@ endfunction() # ---------------------------------------------------------------------------- # section: generate the protobuf .h and .cpp files. # ---------------------------------------------------------------------------- -function(anakin_protos_processing) - set(PROTO_SRC_PATH ${ANAKIN_MODEL_PARSER}/proto) - set(__working_dir ${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/PROTO_TEMP/) - - anakin_fetch_files_with_suffix(${PROTO_SRC_PATH} "proto" PROTO_SRC_FILES) - foreach(__file ${PROTO_SRC_FILES}) - exec_program(${PROTOBUF_PROTOC_EXECUTABLE} ${__working_dir} ARGS " -I=${PROTO_SRC_PATH} --cpp_out=. ${__file}" - OUTPUT_VARIABLE OUTPUT - RETURN_VALUE VALUE) - if(NOT VALUE) - anakin_fetch_files_with_suffix(${__working_dir} "h" PROTO_GENERATE_H) - # get *.cpp or *.cc - anakin_fetch_files_with_suffix(${__working_dir} "c*" PROTO_GENERATE_C) - foreach(__include_file ${PROTO_GENERATE_H}) - exec_program(mv ARGS ${__include_file} ${PROTO_SRC_PATH} - OUTPUT_VARIABLE __out - RETURN_VALUE __value) - endforeach() - foreach(__src_file ${PROTO_GENERATE_C}) - if(POLICY CMP0007) - cmake_policy(PUSH) - cmake_policy(SET CMP0007 NEW) - endif() - string(REPLACE "." ";" SRC_LIST ${__src_file}) - list(GET SRC_LIST -1 __src_file_name_suffix) +function(anakin_gen_pb proto_src_path) + set(__working_dir ${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/PROTO_TEMP/) + foreach(__proto_file ${ARGN}) + exec_program(${PROTOBUF_PROTOC_EXECUTABLE} ${__working_dir} ARGS " -I=${proto_src_path} --cpp_out=. ${__proto_file}" + OUTPUT_VARIABLE OUTPUT RETURN_VALUE VALUE) + if(NOT VALUE) + anakin_fetch_files_with_suffix(${__working_dir} "h" PROTO_GENERATE_H) + # get *.cpp or *.cc + anakin_fetch_files_with_suffix(${__working_dir} "c*" PROTO_GENERATE_C) + foreach(__include_file ${PROTO_GENERATE_H}) + exec_program(mv ARGS ${__include_file} ${proto_src_path} + OUTPUT_VARIABLE __out RETURN_VALUE __value) + endforeach() + foreach(__src_file ${PROTO_GENERATE_C}) + if(POLICY CMP0007) + cmake_policy(PUSH) + cmake_policy(SET CMP0007 NEW) + endif() + string(REPLACE "." ";" SRC_LIST ${__src_file}) + list(GET SRC_LIST -1 __src_file_name_suffix) list(GET SRC_LIST -3 __src_file_name) string(REPLACE "/" ";" SRC_LIST_PATH ${__src_file_name}) @@ -274,18 +269,31 @@ function(anakin_protos_processing) else() set(__full_src_filename "${__pure_src_file_name}.pb.cc") endif() - #message(STATUS " first ---> ${__working_dir}${__full_src_filename} ${ANAKIN_ROOT}/src/${__pure_src_file_name}.pb.cpp") - exec_program(mv ARGS " ${__working_dir}${__full_src_filename} ${PROTO_SRC_PATH}/${__pure_src_file_name}.pb.cpp" + exec_program(mv ARGS " ${__working_dir}${__full_src_filename} ${proto_src_path}/${__pure_src_file_name}.pb.cpp" OUTPUT_VARIABLE __out RETURN_VALUE __value) if(POLICY CMP0007) cmake_policy(POP) endif() - endforeach() - else() - message(FATAL_ERROR "anakin_protos_processing : ${__file} \n error msg: ${OUTPUT}") - endif() - endforeach() + endforeach() + else() + message(FATAL_ERROR "anakin_gen_bp: ${__file} \n error msg: ${OUTPUT}") + endif() + endforeach() +endfunction() + +function(anakin_protos_processing) + set(PROTO_SRC_PATH ${ANAKIN_MODEL_PARSER}/proto) + set(SERVICE_API_SRC_PATH ${ANAKIN_SERVICE}/api) + + set(__working_dir ${CMAKE_BINARY_DIR}${CMAKE_FILES_DIRECTORY}/PROTO_TEMP/) + + anakin_fetch_files_with_suffix(${PROTO_SRC_PATH} "proto" PROTO_SRC_FILES) + anakin_fetch_files_with_suffix(${SERVICE_API_SRC_PATH} "proto" SERVICE_API_PROTO_SRC_FILES) + anakin_gen_pb(${PROTO_SRC_PATH} ${PROTO_SRC_FILES}) + if(BUILD_RPC) + anakin_gen_pb(${SERVICE_API_SRC_PATH} ${SERVICE_API_PROTO_SRC_FILES}) + endif() endfunction() # ---------------------------------------------------------------------------- diff --git a/framework/CMakeLists.txt b/framework/CMakeLists.txt index fe16223a7..47cf6edb4 100644 --- a/framework/CMakeLists.txt +++ b/framework/CMakeLists.txt @@ -71,4 +71,3 @@ if(UNIX OR APPLE) PATTERN "*.inl") endif() endif() -install(FILES ${CMAKE_BINARY_DIR}/anakin_config.h DESTINATION ${PROJECT_SOURCE_DIR}/output/) diff --git a/framework/core/net/net.cpp b/framework/core/net/net.cpp index c972c0c26..d7b640121 100755 --- a/framework/core/net/net.cpp +++ b/framework/core/net/net.cpp @@ -73,9 +73,9 @@ void Net::init(graph::Graph& for (auto& node_name : node_names_in_exec_order) { auto node_ptr = (*_graph_p)[node_name]; //LOG(ERROR) << "get node " << node_name << ", op type " << node_ptr->get_op_name(); - if (node_ptr->get_op_name() == "Output") { + /*if (node_ptr->get_op_name() == "Output") { continue; - } + }*/ // create operations auto* op_pointer = OpFactory::Global()[node_ptr->get_op_name()]; @@ -145,10 +145,7 @@ void Net::init(graph::Graph& // infer basic shape and parsing parameter from graph for (auto& node_name : node_names_in_exec_order) { auto node_ptr = (*_graph_p)[node_name]; - //LOG(ERROR) << "get node " << node_name << ", op type " << node_ptr->get_op_name(); - if (node_ptr->get_op_name() == "Output") { - continue; - } + #ifdef ENABLE_OP_TIMER if (std::string::npos != (node_ptr->get_op_name()).find("Conv") || std::string::npos != (node_ptr->get_op_name()).find("Deconv")) { @@ -203,10 +200,10 @@ void Net::init(graph::Graph& int dil_h = dilation_rate_val.vector()[0]; int dil_w = dilation_rate_val.vector()[1]; - if ((group_val == 1) && (k_w == 3 && k_h == 3 && dil_h == 1 && dil_w == 1)) { - node_ptr->set_op(OpFactory::Global()["Sass"+node_ptr->get_op_name()]); - node_ptr->get_op_name() = "Sass" + node_ptr->get_op_name(); - } else { + if ((group_val == 1) && (k_w == 3 && k_h == 3 && dil_h == 1 && dil_w == 1)) { + node_ptr->set_op(OpFactory::Global()["Sass"+node_ptr->get_op_name()]); + node_ptr->get_op_name() = "Sass" + node_ptr->get_op_name(); + } else { LOG(ERROR) << "node_ptr->get_op_name() sass not support yet."; auto *op_pointer = OpFactory::Global()[node_ptr->get_op_name()]; node_ptr->set_op(op_pointer); @@ -215,8 +212,7 @@ void Net::init(graph::Graph& auto *op_pointer = OpFactory::Global()[node_ptr->get_op_name()]; node_ptr->set_op(op_pointer); } - } - else { + } else { auto *op_pointer = OpFactory::Global()[node_ptr->get_op_name()]; if (op_pointer == nullptr) { CHECK(false)<< node_name << ", type " << node_ptr->get_op_name() << " is null"; @@ -337,72 +333,68 @@ void Net::prediction() { int i = 0; for(auto& executer : _exec_funcs) { - if (RunType == OpRunType::SYNC || executer.need_sync) { + if (RunType == OpRunType::SYNC || executer.need_sync || executer.op_name == "Output") { for(int i = 0; i < executer.ins.size(); i++) { - // sync event record in multi_stream + // sync event record in multi_stream or syn when encountering output op executer.ins[i]->sync(); } } #ifdef ENABLE_DEBUG LOG(ERROR) << " executer : " << executer.name << " (" << executer.op_name << ") "; - for(auto in : executer.ins) { - LOG(ERROR) << " \\in shape " << in->valid_shape()[0] - << " " << in->valid_shape()[1] - << " " << in->valid_shape()[2] - << " " << in->valid_shape()[3] - << " valid_size: " << in->valid_size() - << " realsize: " << in->size() - << " offset_size "<get_seq_offset().size(); + for(auto in : executer.ins) { + LOG(ERROR) << " \\in shape " << in->valid_shape()[0] + << " " << in->valid_shape()[1] + << " " << in->valid_shape()[2] + << " " << in->valid_shape()[3] + << " valid_size: " << in->valid_size() + << " realsize: " << in->size() + << " offset_size "<get_seq_offset().size(); } #endif #ifdef ENABLE_OP_TIMER - Context ctx(0, 0, 0); - saber::SaberTimer my_time; - my_time.start(ctx); + Context ctx(0, 0, 0); + saber::SaberTimer my_time; + my_time.start(ctx); #endif - if (executer.op_name != "Input") { - executer.infer_shape(); - executer.launch(); - } - - for(int i = 0; i < executer.outs.size(); i++) { - executer.outs[i]->record_event(executer.ctx_p->get_compute_stream()); - } -#ifdef ENABLE_OP_TIMER - for (int i = 0; i < executer.outs.size(); i++) { - // record - executer.outs[i]->record_event(executer.ctx_p->get_compute_stream()); - executer.outs[i]->sync(); - } - my_time.end(ctx); - _op_time[op_id++] += my_time.get_average_ms(); + if (executer.op_name != "Input" || executer.op_name != "Output") { + executer.infer_shape(); + executer.launch(); + } + + for(int i = 0; i < executer.outs.size(); i++) { + executer.outs[i]->record_event(executer.ctx_p->get_compute_stream()); + } + +#ifdef ENABLE_OP_TIMER + for (int i = 0; i < executer.outs.size(); i++) { + // record + executer.outs[i]->record_event(executer.ctx_p->get_compute_stream()); + executer.outs[i]->sync(); + } + my_time.end(ctx); + _op_time[op_id++] += my_time.get_average_ms(); #endif - //LOG(INFO)<< "op: " << executer.name<<"(" << executer.op_name <<") === infer+launch time "< offset=out->get_seq_offset(); - LOG(INFO)<<"print offset of "< offset=out->get_seq_offset(); + LOG(INFO)<<"print offset of "<data(); + LOG(ERROR) << " |---out avg " << tensor_average(out); } - LOG(INFO)<<" end print offset of "<data(); -#ifdef USE_X86_PLACE -// for (int i = 0; i < 10; ++i) { -// std::cout << out->data()[i]<<" "; -// } -#endif - LOG(ERROR) << " |---out avg " << tensor_average(out); - } #ifdef USE_ARM_PLACE int idx = 0; @@ -421,7 +413,6 @@ void Net::prediction() { sum += *ptr_data; ptr_data++; } - //LOG(INFO) << "channel: " << j << ", mean value :" << sum_c / (w * h); } LOG(INFO) << executer.name << ", tensor idx: " << idx << ", mean value :" << sum / size << ", num: " << out->num() << \ @@ -660,8 +651,7 @@ template Status Net::init_env(graph::Graph& graph) { LOG(WARNING) << "Detect and initial " << graph.get_ins().size() << " lanes."; // fixme, multi_stream error - //Env::env_init(graph.get_ins().size()); - Env::env_init(1); + Env::env_init(graph.get_ins().size()); LOG(WARNING) << "Current used device id : " << TargetWrapper::get_device_id(); return Status::OK(); } diff --git a/framework/core/net/net.h b/framework/core/net/net.h index ceae98fd6..8094deaf7 100644 --- a/framework/core/net/net.h +++ b/framework/core/net/net.h @@ -28,19 +28,19 @@ namespace anakin { template class Net { public: - explicit Net(bool need_summary = false); + explicit Net(bool need_summary = true); /** * \brief Construct a net by graph. * This construction should be use in thread call and make sure thread safety. */ - explicit Net(graph::Graph&, bool need_summary = false); + explicit Net(graph::Graph&, bool need_summary = true); /** * \brief Construct a net by graph, init with specified context. * This construction should be use in thread call and make sure thread safety. */ - explicit Net(graph::Graph&, OpContextPtr ctx, bool need_summary = false); + explicit Net(graph::Graph&, OpContextPtr ctx, bool need_summary = true); ~Net(); diff --git a/framework/core/net/worker.cpp b/framework/core/net/worker.cpp index d5c0376ac..c63bab977 100644 --- a/framework/core/net/worker.cpp +++ b/framework/core/net/worker.cpp @@ -90,17 +90,25 @@ void Worker::register_interior_edges(std::string b } template -std::vector > Worker::sync_prediction(std::vector::type, Dtype> >& net_ins_list) { - auto task = [&](std::vector::type, Dtype> >& ins) -> std::vector > { +std::future::type, Dtype> > > +Worker::sync_prediction(std::vector::type, Dtype> >& net_ins_list) { + auto task = [&](std::vector::type, Dtype> >& ins) + -> std::vector::type, Dtype> > { auto& net = MultiThreadModel::Global().get_net(std::this_thread::get_id()); //fill the graph inputs for(int i = 0; i < _inputs_in_order.size(); i++) { + for(int j=0; j<10; j++) { + LOG(INFO) << "------> data " << ins[i].mutable_data()[j];; + } auto d_tensor_in_p = net.get_in(_inputs_in_order[i]); - d_tensor_in_p->reshape(ins[i]->valid_shape()); - d_tensor_in_p->copy_from(*ins[i]); - d_tensor_in_p->set_seq_offset(ins[i]->get_seq_offset()); + d_tensor_in_p->reshape(ins[i].valid_shape()); + d_tensor_in_p->copy_from(ins[i]); + d_tensor_in_p->set_seq_offset(ins[i].get_seq_offset()); } + Context ctx(0, 0, 0); + saber::SaberTimer my_time; + my_time.start(ctx); #ifdef ENABLE_OP_TIMER Context ctx(0, 0, 0); saber::SaberTimer my_time; @@ -108,27 +116,37 @@ std::vector > Worker::sy #endif net.prediction(); + my_time.end(ctx); + LOG(ERROR) << " exec << time: " << my_time.get_average_ms() << " ms "; + #ifdef ENABLE_OP_TIMER my_time.end(ctx); { std::lock_guard guard(_mut); _thead_id_to_prediction_times_vec_in_ms[std::this_thread::get_id()].push_back(my_time.get_average_ms()); + LOG(ERROR) << " exec << time: " << my_time.get_average_ms() << " ms "; } #endif // get outputs of graph - std::vector> ret; - for (auto out : _outputs_in_order) { - auto d_tensor_out_p = net.get_out(out); - ret.push_back(d_tensor_out_p); + std::vector::type, Dtype>> ret; + ret.resize(_outputs_in_order.size()); + for (int out_idx = 0; out_idx < _outputs_in_order.size(); out_idx++) { + auto d_tensor_out_p = net.get_out(_outputs_in_order[out_idx]); + ret[out_idx].re_alloc(d_tensor_out_p->valid_shape()); + ret[out_idx].copy_from(*d_tensor_out_p); + LOG(INFO) << "this thread: " << std::this_thread::get_id(); + for(int i=0; i< 10; i++) { + LOG(INFO) << "????? data " << ret[out_idx].mutable_data()[i]; + } } return ret; }; - return this->RunSync(task, net_ins_list); + return this->RunAsync(task, net_ins_list); } template -std::vector > Worker::sync_prediction_device(std::vector >& net_ins_list) { +std::future > > Worker::sync_prediction_device(std::vector >& net_ins_list) { auto task = [&](std::vector >& ins) -> std::vector > { auto& net = MultiThreadModel::Global().get_net(std::this_thread::get_id()); //fill the graph inputs @@ -146,7 +164,7 @@ std::vector > Worker::sy return ret; }; - return this->RunSync(task, net_ins_list); + return this->RunAsync(task, net_ins_list); } template diff --git a/framework/core/net/worker.h b/framework/core/net/worker.h index b1ed97d2e..4bd998f40 100644 --- a/framework/core/net/worker.h +++ b/framework/core/net/worker.h @@ -106,15 +106,15 @@ class Worker : public ThreadPool { * \param host net_in_list the inputs of net graph (note: the len of net_in_list should be equal to the net inputs). * \return the net graph outputs. */ - std::vector > sync_prediction(\ - std::vector::type, Dtype> >& net_in_list); + std::future::type, Dtype> > > sync_prediction(\ + std::vector::type, Dtype> >& net_in_list); /** * \brief Do sync prediction in multi-thread worker useful in sync rpc server, this function need * \param device net_in_list the inputs of net graph (note: the len of net_in_list should be equal to the net inputs). * \return the net graph outputs. */ - std::vector > sync_prediction_device(\ + std::future > > sync_prediction_device(\ std::vector >& net_in_list); /** diff --git a/framework/core/singleton.cpp b/framework/core/singleton.cpp index d8a2b9c0a..83562a3da 100644 --- a/framework/core/singleton.cpp +++ b/framework/core/singleton.cpp @@ -1,4 +1,4 @@ -#include "framework/core/thread_safe_macros.h" +#include "framework/core/singleton.h" namespace anakin { diff --git a/framework/core/thread_pool.inl b/framework/core/thread_pool.inl index 3aa8bab05..d581cfbab 100644 --- a/framework/core/thread_pool.inl +++ b/framework/core/thread_pool.inl @@ -1,7 +1,7 @@ namespace anakin { -void ThreadPool::launch() { +inline void ThreadPool::launch() { for(size_t i = 0; i<_num_thread; ++i) { _workers.emplace_back( [i ,this]() { @@ -20,7 +20,7 @@ void ThreadPool::launch() { task = std::move(this->_tasks.front()); this->_tasks.pop(); } - DLOG(INFO) << " Thread (" << i <<") processing"; + LOG(WARNING) << " Thread (" << i <<") processing thread_id: " << std::this_thread::get_id(); auxiliary_funcs(); task(); } @@ -29,16 +29,16 @@ void ThreadPool::launch() { } } -void ThreadPool::stop() { +inline void ThreadPool::stop() { std::unique_lock lock(this->_mut); _stop = true; } -void ThreadPool::init() {} +inline void ThreadPool::init() {} -void ThreadPool::auxiliary_funcs() {} +inline void ThreadPool::auxiliary_funcs() {} -ThreadPool::~ThreadPool() { +inline ThreadPool::~ThreadPool() { stop(); this->_cv.notify_all(); for(auto & worker: _workers){ @@ -47,7 +47,7 @@ ThreadPool::~ThreadPool() { } template -typename function_traits::return_type ThreadPool::RunSync(functor function, ParamTypes ...args) +inline typename function_traits::return_type ThreadPool::RunSync(functor function, ParamTypes ...args) EXCLUSIVE_LOCKS_REQUIRED(_mut) { auto task = std::make_shared::return_type(void)> >( \ std::bind(function, std::forward(args)...) @@ -55,14 +55,14 @@ typename function_traits::return_type ThreadPool::RunSync(functor funct std::future::return_type> result = task->get_future(); { std::unique_lock lock(this->_mut); - this->_tasks.emplace( [&]() { (*task)(); } ); + this->_tasks.emplace( [=]() { (*task)(); } ); } this->_cv.notify_one(); return result.get(); } template -std::future::return_type> ThreadPool::RunAsync(functor function, ParamTypes ...args) +inline std::future::return_type> ThreadPool::RunAsync(functor function, ParamTypes ...args) EXCLUSIVE_LOCKS_REQUIRED(_mut) { auto task = std::make_shared::return_type(void)> >( \ std::bind(function, std::forward(args)...) diff --git a/framework/core/type_traits_extend.h b/framework/core/type_traits_extend.h index 0129377e0..1154780d4 100644 --- a/framework/core/type_traits_extend.h +++ b/framework/core/type_traits_extend.h @@ -157,7 +157,8 @@ struct target_host { typedef saber::NVHX86 type; }; - +template +struct ServiceRunPatternToType{}; } /* namespace anakin */ diff --git a/framework/core/types.h b/framework/core/types.h index 81abfb5ac..0444b06f1 100644 --- a/framework/core/types.h +++ b/framework/core/types.h @@ -38,6 +38,15 @@ enum class OpRunType : int { ASYNC ///< ASYNC the net exec asynchronous (for GPU, means mutli-stream) }; +/** + * \brief service run pattern + */ +enum class ServiceRunPattern: int { + SYNC, + ASYNC +}; + + /** * \brief Inner return type used by Status type. */ diff --git a/framework/graph/graph.cpp b/framework/graph/graph.cpp index 54eb66cbd..2b28be0f9 100644 --- a/framework/graph/graph.cpp +++ b/framework/graph/graph.cpp @@ -138,19 +138,23 @@ Status Graph::Optimize() EXCLUSIVE_LOCKS_REQUIRED(_mut) { _nodes_exec_order = scheduler.get_exec_node_in_order(); #else // enable conv+eltwise fusion // optimization - ConvElsFusionScheduler conv_eltwise_fusion_scheduler; - conv_eltwise_fusion_scheduler.RegIOResource(_vgraph); - conv_eltwise_fusion_scheduler.Run(); - // get node exec in order - //_nodes_exec_order = conv_eltwise_fusion_scheduler.get_exec_node_in_order(); + ConvElsFusionScheduler conv_eltwise_fusion_scheduler; + conv_eltwise_fusion_scheduler.RegIOResource(_vgraph); + conv_eltwise_fusion_scheduler.Run(); + // get node exec in order + //_nodes_exec_order = conv_eltwise_fusion_scheduler.get_exec_node_in_order(); #endif - // optimization again + // optimization again + ParallScheduler para_scheduler; + para_scheduler.RegIOResource(_vgraph); + para_scheduler.Run(); + MemoryScheduler mem_scheduler; mem_scheduler.RegIOResource(_vgraph); mem_scheduler.Run(); - ParallScheduler para_scheduler; + /*ParallScheduler para_scheduler; para_scheduler.RegIOResource(_vgraph); - para_scheduler.Run(); + para_scheduler.Run();*/ // set info for graph statistics.set_info(true); diff --git a/framework/graph/graph_global_mem.h b/framework/graph/graph_global_mem.h index 5ecfe9914..8e919d3e5 100644 --- a/framework/graph/graph_global_mem.h +++ b/framework/graph/graph_global_mem.h @@ -47,7 +47,7 @@ class GraphGlobalMemBase { } /// get sum size in m-btyes - size_t get_sum_mbyte() EXCLUSIVE_LOCKS_REQUIRED(_mut) { + float get_sum_mbyte() EXCLUSIVE_LOCKS_REQUIRED(_mut) { std::unique_lock lock(this->_mut); size_t sum = 0; for (auto block_p : _int8_mem_pool) { @@ -139,7 +139,7 @@ enum INFO{ template struct Decide{ - typedef int type; + typedef float type; }; template<> @@ -164,16 +164,16 @@ struct Statistics { template struct Info_to_type {}; - inline void _set_info(int mem_in_mbytes, Info_to_type) { + inline void _set_info(float mem_in_mbytes, Info_to_type) { temp_mem_used = mem_in_mbytes; } - inline void _set_info(int mem_in_mbytes, Info_to_type) { + inline void _set_info(float mem_in_mbytes, Info_to_type) { original_temp_mem_used = mem_in_mbytes; } - inline void _set_info(int mem_in_mbytes, Info_to_type) { + inline void _set_info(float mem_in_mbytes, Info_to_type) { model_mem_used = mem_in_mbytes; } - inline void _set_info(int mem_in_mbytes, Info_to_type) { + inline void _set_info(float mem_in_mbytes, Info_to_type) { system_mem_used = mem_in_mbytes; } inline void _set_info(bool whether_optimized, Info_to_type) { @@ -198,13 +198,13 @@ struct Statistics { private: ///< temp_mem_used : temp memory used by anakin edge [MB].default 0 - int temp_mem_used{0}; + float temp_mem_used{0.f}; ///< original_temp_mem_used : temp memory used by old version [MB].default 0 - int original_temp_mem_used{0}; + float original_temp_mem_used{0.f}; ///< system_mem_used : system mem used by nvidia / amd GPU system resource [MB].default 0 - int system_mem_used{0}; + float system_mem_used{0.f}; ///< model_mem_used : mem used by model.default 0 - int model_mem_used{0}; + float model_mem_used{0.f}; ///< is_optimized stand for whether optimized flag.default false bool is_optimized{false}; diff --git a/framework/graph/llvm/optimizer/memory_scheduler.cpp b/framework/graph/llvm/optimizer/memory_scheduler.cpp index c836b44c7..2135f5c49 100644 --- a/framework/graph/llvm/optimizer/memory_scheduler.cpp +++ b/framework/graph/llvm/optimizer/memory_scheduler.cpp @@ -123,8 +123,8 @@ void IOBlockResource::free(std::vector& io_vec, VGraph* vgraph_p) { void IOBlockResource::lock(std::vector& io_vec) { for (auto& io_res : io_vec) { - if (has_free()) { - auto& tmp_io = _free.front(); // get active resouce + if (has_free(io_res)) { + auto tmp_io = get_free(io_res);//_free.front(); // get active resouce io_res.shared = true; if (tmp_io.shared) { @@ -134,7 +134,6 @@ void IOBlockResource::lock(std::vector& io_vec) { } _lock.push_back(io_res); - _free.pop_front(); } else { // alloc new io block io_res.shared = false; _lock.push_back(io_res); diff --git a/framework/graph/llvm/optimizer/memory_scheduler.h b/framework/graph/llvm/optimizer/memory_scheduler.h index e4a0e7592..3325ed066 100644 --- a/framework/graph/llvm/optimizer/memory_scheduler.h +++ b/framework/graph/llvm/optimizer/memory_scheduler.h @@ -83,7 +83,28 @@ class IOBlockResource { ~IOBlockResource() {} void free(std::vector&, VGraph*); - inline bool has_free() { return !(_free.empty()); } + inline bool has_free(io& target) { + for (auto it = _free.begin(); it != _free.end();) { + auto& io_tmp = *it; + if(target.lane == io_tmp.lane) { + return true; + } + ++it; + } + return false; + } + inline io get_free(io& target) { + for (auto it = _free.begin(); it != _free.end();) { + auto& io_tmp = *it; + if(target.lane == io_tmp.lane) { + it = _free.erase(it); + return io_tmp; + } else { + ++it; + } + } + return io(); + } bool is_same_target(io&, io&, VGraph*); void push_free(io&, VGraph*); void lock(std::vector&); diff --git a/framework/service/CMakeLists.txt b/framework/service/CMakeLists.txt new file mode 100644 index 000000000..510b1367c --- /dev/null +++ b/framework/service/CMakeLists.txt @@ -0,0 +1,13 @@ +# ---------------------------------------------------------------------------- +# Copyright (c) 2016 Baidu.com, Inc. All Rights Reserved +# ---------------------------------------------------------------------------- + +# used for temporary +anakin_fetch_include_recursively(${ANAKIN_SERVICE}) + +anakin_fetch_files_with_suffix(${ANAKIN_SERVICE}/api "cpp" ANAKIN_BASE_SRC) +anakin_fetch_files_with_suffix(${ANAKIN_SERVICE} "cpp" ANAKIN_BASE_SRC) + +list(APPEND ANAKIN_SRC ${ANAKIN_BASE_SRC}) +set(ANAKIN_SRC ${ANAKIN_SRC} PARENT_SCOPE) +unset(ANAKIN_BASE_SRC) diff --git a/framework/service/anakin_service.cpp b/framework/service/anakin_service.cpp new file mode 100644 index 000000000..e5701a9e2 --- /dev/null +++ b/framework/service/anakin_service.cpp @@ -0,0 +1,165 @@ +#include "framework/service/anakin_service.h" + +namespace anakin { + +namespace rpc { + +template +void AnakinService::set_device_id(int dev_id) { + _dev_id = dev_id; + saber::TargetWrapper::set_device(_dev_id); +} + +template +void AnakinService::initial(std::string model_name, + std::string model_path, + int thread_num) { + _worker_map[model_name] = std::make_shared >(model_path, + thread_num); +} + +template +void AnakinService::launch() { + for(auto it = _worker_map.begin(); it != _worker_map.end();) { + it->second->launch(); + it++; + } +} + +template +void AnakinService::register_inputs(std::string model_name, + std::vector in_names) { + _worker_map[model_name]->register_inputs(in_names); +} + +template +void AnakinService::register_outputs(std::string model_name, + std::vector out_names) { + _worker_map[model_name]->register_outputs(out_names); +} + +template +void AnakinService::Reshape(std::string model_name, + std::string in_name, + std::vector in_shape) { + _worker_map[model_name]->Reshape(in_name, in_shape); +} + +template +void AnakinService::register_interior_edges(std::string model_name, + std::string edge_start, + std::string edge_end) { + _worker_map[model_name]->register_interior_edges(edge_start, edge_end); +} + +template +inline void AnakinService::extract_request( + const RPCRequest* request, + std::vector::type, Dtype> >& inputs) { + for(int i = 0; i < request->inputs_size(); i++) { + LOG(INFO) << "Get " << i << "input"; + auto& io = request->inputs(i); + auto& data = io.tensor(); + auto& shape = data.shape(); + saber::Shape tensor_shape{shape[0],shape[1],shape[2],shape[3]}; + Tensor4d::type, Dtype> h_tensor; + h_tensor.re_alloc(tensor_shape); + auto* h_data = h_tensor.mutable_data(); + DLOG(INFO) <<"Check shape: " << shape[0] << " " << shape[1] << " " << shape[2] << " " < +inline void AnakinService::fill_response_data( + int request_id, + std::string model_name, + RPCResponse* response, + std::vector::type, Dtype> >& outputs) { + response->set_model(model_name); + response->set_request_id(request_id); + int count =0; + for(auto& h_out : outputs) { + LOG(INFO) << "Get " << count << " output"; + count++; + // copy to host + auto shape = h_out.valid_shape(); + // fill response + IO* output = response->add_outputs(); + Data* data = output->mutable_tensor(); + data->add_shape(shape[0]); + data->add_shape(shape[1]); + data->add_shape(shape[2]); + data->add_shape(shape[3]); + data->mutable_data()->Reserve(shape[0]*shape[1]*shape[2]*shape[3]); + for(int j=0; jadd_data(h_out.mutable_data()[j]); + } + LOG(INFO) << " output size: " <data_size(); + } +} + +template +inline void AnakinService::fill_response_exec_info(RPCResponse* response) { + auto* info = response->mutable_info(); + info->set_msg("SUC"); + DeviceStatus* status_p = info->mutable_device_status(); + status_p->set_id(_monitor.get_id()); + status_p->set_name(_monitor.get_name()); + status_p->set_temp(_monitor.get_temp()); + status_p->set_mem_free(_monitor.get_mem_free()); + status_p->set_mem_used(_monitor.get_mem_used()); + info->set_duration_in_nano_seconds(-1); +} + +#ifdef USE_CUDA +template class AnakinService; +template class AnakinService; +template class AnakinService; + +template class AnakinService; +template class AnakinService; +template class AnakinService; +#endif + +#ifdef USE_X86_PLACE +template class AnakinService; +template class AnakinServNet; +template class AnakinServNet; + +template class AnakinServNet; +template class AnakinServNet; +template class AnakinServNet; +#endif + +#ifdef USE_ARM_PLACE +#ifdef ANAKIN_TYPE_FP32 +template class AnakinService; +template class AnakinService; +#endif + +#ifdef ANAKIN_TYPE_FP16 +template class AnakinService; +template class AnakinService; +#endif + +#ifdef ANAKIN_TYPE_INT8 +template class AnakinService; +template class AnakinService; +#endif //int8 + +#endif //arm + + +} /* namespace rpc */ + +} /* namespace anakin */ + diff --git a/framework/service/anakin_service.h b/framework/service/anakin_service.h new file mode 100644 index 000000000..6c50ad46b --- /dev/null +++ b/framework/service/anakin_service.h @@ -0,0 +1,120 @@ +/* Copyright (c) 2018 Anakin Authors, Inc. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef ANAKIN_SERVICE_H +#define ANAKIN_SERVICE_H + +#include + +#include "framework/service/monitor.h" +#include "framework/core/net/worker.h" +#include "framework/service/api/service.pb.h" + +namespace anakin { + +namespace rpc { + +template +class AnakinService : public RPCService { +public: + void evaluate(::google::protobuf::RpcController* controller_base, + const RPCRequest* request, + RPCResponse* response, + ::google::protobuf::Closure* done) { + _evaluate(controller_base, request, response, done, ServiceRunPatternToType()); + } + +public: + void set_device_id(int dev_id); + + void initial(std::string model_name, std::string model_path, int thread_num); + + void launch(); + + void Reshape(std::string model_name, std::string in_name, std::vector in_shape); + + void register_inputs(std::string model_name, std::vector in_names); + + void register_outputs(std::string model_name, std::vector); + + void register_interior_edges(std::string model_name, std::string edge_start, std::string edge_end); + + template + void register_aux_function(std::string model_name, functor function, ParamTypes ...args) { + _worker_map[model_name].register_aux_function(function, std::forward(args)...); + } + + template + void create_monitor(int interval_time_in_sec) { + _monitor.template create_instance(_dev_id, interval_time_in_sec); + } + +private: + void extract_request(const RPCRequest* request, + std::vector::type, Dtype> >& inputs); + void fill_response_data(int request_id, std::string model_name, + RPCResponse* response, + std::vector::type, Dtype> >& outputs); + void fill_response_exec_info(RPCResponse* response); + +private: + void _evaluate(::google::protobuf::RpcController* controller_base, + const RPCRequest* request, + RPCResponse* response, + ::google::protobuf::Closure* done, + ServiceRunPatternToType) { + // make sure that done will be invoked + brpc::ClosureGuard done_guard(done); + brpc::Controller* cntl = static_cast(controller_base); + // receive remote call from client. + LOG(INFO) << "Received request[log_id=" << cntl->log_id() << "] from " << cntl->remote_side(); + if (!cntl->request_attachment().empty()) { + LOG(INFO) << " |-- (attached=" << cntl->request_attachment() << ")"; + } + std::string model_name = request->model(); + int request_id = request->request_id(); + LOG(INFO) <<" |-- Get model: "<::type, Dtype> > inputs; + extract_request(request, inputs); + auto ret = _worker_map[model_name]->sync_prediction(inputs); + auto results = ret.get(); + LOG(ERROR) << "do infer over! thread id: " << std::this_thread::get_id(); + fill_response_data(request_id, model_name, response, results); + fill_response_exec_info(response); + } + + void _evaluate(::google::protobuf::RpcController* controller_base, + const RPCRequest* request, + RPCResponse* response, + ::google::protobuf::Closure* done, + ServiceRunPatternToType) { + // make sure that done will be invoked + brpc::ClosureGuard done_guard(done); + brpc::Controller* cntl = static_cast(controller_base); + } + + + +private: + std::unordered_map > > _worker_map; + Monitor _monitor; + int _dev_id; +}; + +} /* namespace rpc */ + +} /* namespace anakin */ + +#endif diff --git a/framework/service/api/.gitignore b/framework/service/api/.gitignore new file mode 100644 index 000000000..0d9561969 --- /dev/null +++ b/framework/service/api/.gitignore @@ -0,0 +1,42 @@ +# Prerequisites +*.d + +# Compiled Object files +*.slo +*.lo +*.o +*.obj + +# Precompiled Headers +*.gch +*.pch + +# Compiled Dynamic libraries +*.so +*.dylib +*.dll + +# Fortran module files +*.mod +*.smod + +# Compiled Static libraries +*.lai +*.la +*.a +*.lib + +# Executables +*.exe +*.out +*.app + +# python +*.pyc + +# c++ +*.h +*.cpp +*.cc +*.pb.h +*.pb.cpp diff --git a/framework/service/api/service.proto b/framework/service/api/service.proto new file mode 100644 index 000000000..baa61f03f --- /dev/null +++ b/framework/service/api/service.proto @@ -0,0 +1,51 @@ +syntax = "proto3"; + +package anakin.rpc; + +option cc_generic_services = true; + +message Data { + repeated int32 shape = 1; + repeated float data = 2; +}; + +message IO { + Data tensor = 1; // input tensor +}; + +// RPC request +message RPCRequest { + bytes model = 1; + repeated IO inputs = 2; + int64 request_id = 3; // you need to set request ID,then to get async retults by request_id +}; + +message DeviceStatus { + int32 id = 1; // device id (represent as device num id) + bytes name = 2; // device name + int32 temp = 3; // device temperature Celsius degree + int32 mem_free = 4; // device memory free bytes + int32 mem_used = 5; // device memory used bytes +}; + +// RPC service execution information +message ExecutionInfo { + // additional exception message of the execution + bytes msg = 1; + // duration of this execution in nano seconds + int32 duration_in_nano_seconds = 2; + // device status + DeviceStatus device_status = 3; +}; + +// RPC response +message RPCResponse { + bytes model = 1; // model name + repeated IO outputs = 2; // evaluation output of a batch + ExecutionInfo info = 3; // the additional information of this execution + int64 request_id = 4; +}; + +service RPCService { + rpc evaluate (RPCRequest) returns (RPCResponse); +}; diff --git a/framework/service/device_info.cpp b/framework/service/device_info.cpp new file mode 100644 index 000000000..9639c5699 --- /dev/null +++ b/framework/service/device_info.cpp @@ -0,0 +1,99 @@ +#include "framework/service/device_info.h" + +namespace anakin { + +namespace rpc { + +#ifdef USE_CUDA +template<> +struct Inquiry { + ~Inquiry() { + result = nvmlShutdown(); + if (NVML_SUCCESS != result) { + LOG(FATAL) << "Failed to shutdown the nvml of device: " << nvmlErrorString(result); + } + } + + void init(int dev_id = 0) { + _dev_id = dev_id; + memory_has_inspected = false; + result = nvmlInit(); + if (NVML_SUCCESS != result) { + LOG(FATAL) <<" Failed to initialize NVML: " << nvmlErrorString(result); + } + result = nvmlDeviceGetHandleByIndex(dev_id, &device); + if (NVML_SUCCESS != result) { + LOG(FATAL) << " Failed to get handle for device: " << nvmlErrorString(result); + } + } + + template + typename InfoTraits::data_type get() { + LOG(WARNING) << "Target not support! "; + return InfoTraits::data_type(); + } + +private: + int _dev_id; + nvmlReturn_t result; + nvmlDevice_t device; + nvmlMemory_t memory; + bool memory_has_inspected; +}; + +template<> +typename InfoTraits::data_type Inquiry::get() { + return _dev_id; +} + +template<> +typename InfoTraits::data_type Inquiry::get() { + char name[NVML_DEVICE_NAME_BUFFER_SIZE]; + result = nvmlDeviceGetName(device, name, NVML_DEVICE_NAME_BUFFER_SIZE); + if (NVML_SUCCESS != result) { + LOG(FATAL) << "Failed to get name of device: " << nvmlErrorString(result); + } + return std::string(name); +} + +template<> +typename InfoTraits::data_type Inquiry::get() { + nvmlTemperatureSensors_t sensorType = NVML_TEMPERATURE_GPU; + unsigned int temp; + result = nvmlDeviceGetTemperature(device, sensorType, &temp); + if (NVML_SUCCESS != result) { + LOG(FATAL) << "Failed to get temperature of device: " << nvmlErrorString(result); + } + return temp; +} + +template<> +typename InfoTraits::data_type Inquiry::get() { + if(!memory_has_inspected) { + result = nvmlDeviceGetMemoryInfo(device, &memory); + if (NVML_SUCCESS != result) { + LOG(FATAL) << "Failed to get device memory info of device: " << nvmlErrorString(result); + } + memory_has_inspected = true; + } + return memory.free; +} + +template<> +typename InfoTraits::data_type Inquiry::get() { + if(!memory_has_inspected) { + result = nvmlDeviceGetMemoryInfo(device, &memory); + if (NVML_SUCCESS != result) { + LOG(FATAL) << "Failed to get device memory info of device: " << nvmlErrorString(result); + } + memory_has_inspected = true; + } + return memory.used; +} + +#endif + +} /* namespace rpc */ + +} /* namespace anakin */ + diff --git a/framework/service/device_info.h b/framework/service/device_info.h new file mode 100644 index 000000000..409999f26 --- /dev/null +++ b/framework/service/device_info.h @@ -0,0 +1,190 @@ +/* Copyright (c) 2018 Anakin Authors, Inc. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef ANAKIN_DEVICE_INFO_H +#define ANAKIN_DEVICE_INFO_H + +#include "anakin_config.h" + +#include +#include +#include +#include +#include +#include +#include + +#ifdef USE_CUDA +#include +#include +#include +#include // cuda driver types +#endif + +#include "utils/logger/logger.h" +#include "saber/saber_types.h" + +namespace anakin { + +namespace rpc { + +enum Info { + DEV_ID, + DEV_NAME, + DEV_TMP, + DEV_MEM_FREE, + DEV_MEM_USED, +}; + +template +struct check_same { + const static bool value = false; +}; + +template<> +struct check_same { + const static bool value = true; +}; + +template<> +struct check_same { + const static bool value = true; +}; + +template<> +struct check_same { + const static bool value = true; +}; + +template<> +struct check_same { + const static bool value = true; +}; + +template<> +struct check_same { + const static bool value = true; +}; + + +template +struct InfoTraits { + typedef float data_type; + float _val; +}; + +template<> +struct InfoTraits { + typedef std::string data_type; +}; + +template<> +struct InfoTraits { + typedef int data_type; +}; + +template +struct InfoStruct { + void _set(typename InfoTraits::data_type value) { + _val = value; + } + typename InfoTraits::data_type _get() { + return _val; + } +private: + typename InfoTraits::data_type _val; +}; + +template +struct Inquiry { + ~Inquiry() {} + + void init(int dev_id = 0) {} + + template + typename InfoTraits::data_type get() { + LOG(WARNING) << "Target not support! "; + return typename InfoTraits::data_type(); + } +private: + int _dev_id; +}; + +template +struct HasTarget { + const static bool value = check_same::value || HasTarget::value; +}; + +template +struct HasTarget { + const static bool value = check_same::value; +}; + +template +class DevInfo : public InfoStruct... { +public: + template + void set(typename InfoTraits::data_type value) { + std::unique_lock lock(this->_mut); + if(HasTarget::value) { + LOG(FATAL)<<" DevInfo parameter pack doesn't have target info type " << I; + } + InfoStruct::_set(value); + } + + template + typename InfoTraits::data_type get() { + if(HasTarget::value) { + LOG(ERROR)<<" DevInfo parameter pack doesn't have target info type " << I; + return typename InfoTraits::data_type(); + } + return InfoStruct::_get(); + } + + template + void inquiry(int dev_id) { + Inquiry instance; + instance.init(dev_id); + std::vector info_vec = {infos...}; + for(auto& info : info_vec) { + switch(info) { + case DEV_ID: { + set(instance.get()); + } break; + case DEV_NAME: { + set(instance.get()); + } break; + case DEV_TMP: { + set(instance.get()); + } break; + case DEV_MEM_FREE: { + set(instance.get()); + } break; + case DEV_MEM_USED: { + set(instance.get()); + } break; + default: break; + } + } + } +private: + std::mutex _mut; +}; + +} /* namespace rpc */ + +} /* namespace anakin */ + +#endif diff --git a/framework/service/monitor.cpp b/framework/service/monitor.cpp new file mode 100644 index 000000000..c059c85e6 --- /dev/null +++ b/framework/service/monitor.cpp @@ -0,0 +1,10 @@ +#include "framework/service/monitor.h" + +namespace anakin { + +namespace rpc { + +} /* namespace rpc */ + +} /* namespace anakin */ + diff --git a/framework/service/monitor.h b/framework/service/monitor.h new file mode 100644 index 000000000..3ef423c1f --- /dev/null +++ b/framework/service/monitor.h @@ -0,0 +1,77 @@ +/* Copyright (c) 2018 Anakin Authors, Inc. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef ANAKIN_MONITOR_H +#define ANAKIN_MONITOR_H + +#include "framework/service/device_info.h" + +namespace anakin { + +namespace rpc { + +/// monitor thread pool +template +class Monitor { +public: + Monitor(){} + ~Monitor(){} + + template + void create_instance(int dev_id, int interval_time_in_sec) { + _id = dev_id; + _monitor_thread = new std::thread([this](int dev_id, int time) { + DevInfo dev_info_pack; + std::chrono::time_point start = sys_clock::now(); + for(;;) { + double elapsed_time_ms =\ + std::chrono::duration_cast(sys_clock::now()-start).count(); + if(elapsed_time_ms > time * 1000) { + start = sys_clock::now(); + dev_info_pack.template inquiry(dev_id); + _name = dev_info_pack.template get(); + _temp = dev_info_pack.template get(); + _mem_free = dev_info_pack.template get(); + _mem_used = dev_info_pack.template get(); + } + } + }, dev_id, interval_time_in_sec); + } + + int get_id() { return _id; } + + std::string get_name() { return _name; } + + float get_temp() { return _temp; } + + float get_mem_free() { return _mem_free; } + + float get_mem_used() { return _mem_used; } + +private: + typedef std::chrono::system_clock sys_clock; + int _id{-1}; // device id (represent as device num id) + std::string _name{"unknown"}; // device name + float _temp{-1000}; // device temperature Celsius degree + float _mem_free{-1}; // device memory free bytes + float _mem_used{-1}; + std::thread* _monitor_thread; +}; + +} /* namespace rpc */ + +} /* namespace anakin */ + +#endif diff --git a/framework/service/service_daemon.cpp b/framework/service/service_daemon.cpp new file mode 100644 index 000000000..697c43b14 --- /dev/null +++ b/framework/service/service_daemon.cpp @@ -0,0 +1,99 @@ +#include "framework/service/service_daemon.h" + +namespace anakin { + +namespace rpc { + +void ServiceDaemon::operator()(std::function server_start, + std::vector device_list, + int server_port) { + // Our process ID and Session ID + pid_t pid, sid; + + // Fork off the parent process + pid = fork(); + if (pid < 0) { + exit(EXIT_FAILURE); + } + // exit the parent process. + if (pid > 0) { + exit(EXIT_SUCCESS); + } + + // Change the file mode mask, so we can use the files created by daemon. + umask(0); + + // Create a new SID(a new session) for the child process + sid = setsid(); + if (sid < 0) { + // Log the failure + exit(EXIT_FAILURE); + } + + // Change the current working directory + if ((chdir("/")) < 0) { + exit(EXIT_FAILURE); + } + + // Close out the standard file descriptors + //close(STDIN_FILENO); // 0 + //close(STDOUT_FILENO); // 1 + //close(STDERR_FILENO); // 2 + + // Daemon-specific initialization goes here */ + pid_t *pid_news = new pid_t[device_list.size()]; + for(;;) { + for(auto dev_id : device_list) { + if(!check_port_occupied(server_port) || !check_process_exist(pid_news[dev_id])) { + LOG(WARNING) <<" Create daemon process on device : " << dev_id; + // reaped zombie process + if(pid_news[dev_id]) waitpid(pid_news[dev_id], NULL, 0); + + pid_news[dev_id] = fork(); + // fork new process + if(pid_news[dev_id] == 0) { + prctl(PR_SET_NAME, "anakin_child_rpc_service"); + int ret = server_start(server_port, dev_id); + if(ret == 0) exit(EXIT_SUCCESS); + else exit(EXIT_FAILURE); + } + } + } + + sleep(30); // wait 30 seconds + } + exit(EXIT_SUCCESS); +} + +bool ServiceDaemon::check_port_occupied(int port) { + struct sockaddr_in client; + int sk; + + client.sin_family = AF_INET; + client.sin_port = htons(port); + client.sin_addr.s_addr = inet_addr("0.0.0.0"); + + sk = (int) socket(AF_INET, SOCK_STREAM, 0); + + int result = connect(sk, (struct sockaddr *) &client, sizeof(client)); + + if (result == 0) { + return true; // port is ocuupied. + } else { + return false; + } +} + +bool ServiceDaemon::check_process_exist(pid_t pid) { + if(kill(pid, 0) == -1) { + return false; + } else { + // process still exists + return true; + } +} + +} /* namespace rpc */ + +} /* namespace anakin */ + diff --git a/framework/service/service_daemon.h b/framework/service/service_daemon.h new file mode 100644 index 000000000..6adbccce8 --- /dev/null +++ b/framework/service/service_daemon.h @@ -0,0 +1,64 @@ +/* Copyright (c) 2018 Anakin Authors, Inc. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef ANAKIN_SERVICE_DAEMON_H +#define ANAKIN_SERVICE_DAEMON_H + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "framework/service/anakin_service.h" + +namespace anakin { + +namespace rpc { + +class ServiceDaemon { +public: + ServiceDaemon() {} + ~ServiceDaemon() {} + + void operator()(std::function server_start, + std::vector device_list, + int server_port); + +private: + bool check_port_occupied(int port); + + bool check_process_exist(pid_t pid); + +private: +}; + +} /* namespace rpc */ + +} /* namespace anakin */ + +#endif diff --git a/saber/core/context.h b/saber/core/context.h index b8a916578..0a16c3bcc 100644 --- a/saber/core/context.h +++ b/saber/core/context.h @@ -43,14 +43,18 @@ class Context final{ _device_id = device_id; } if (data_stream_id >= devs[_device_id]._max_stream) { - LOG(WARNING) << "data stream index exceeds the maximum stream number, set to default stream(0)!"; + LOG(WARNING) << "data stream index("<< data_stream_id + << ") exceeds the maximum stream number("<< devs[_device_id]._max_stream + << "), set to default stream(0)!"; data_stream_id = 0; } _stream_data = devs[_device_id]._data_stream[data_stream_id]; _data_stream_id = data_stream_id; if (compute_stream_id >= devs[_device_id]._max_stream) { - LOG(WARNING) << "compute stream index exceeds the maximum stream number, set to default stream(0)!"; + LOG(WARNING) << "compute stream index(" << compute_stream_id + << ") exceeds the maximum stream number("<< devs[_device_id]._max_stream + << "), set to default stream(0)!"; compute_stream_id = 0; } _stream_compute = devs[_device_id]._compute_stream[compute_stream_id]; diff --git a/saber/core/impl/cuda/cuda_impl.cpp b/saber/core/impl/cuda/cuda_impl.cpp index 063cda12c..f01a0e308 100644 --- a/saber/core/impl/cuda/cuda_impl.cpp +++ b/saber/core/impl/cuda/cuda_impl.cpp @@ -123,6 +123,7 @@ void NVH_API::sync_stream(event_t& event, stream_t& stream) {} void NVH_API::sync_memcpy(void* dst, int dst_id, const void* src, int src_id, \ size_t count, __HtoH) { CUDA_CHECK(cudaMemcpy(dst, src, count, cudaMemcpyHostToHost)); + CUDA_CHECK(cudaStreamSynchronize(0)); //LOG(INFO) << "NVH, sync, H2H, size: " << count; } @@ -223,6 +224,7 @@ void NV_API::sync_memcpy(void* dst, int dst_id, const void* src, int src_id, \ size_t count, __DtoD) { if(dst_id == src_id){ CUDA_CHECK(cudaMemcpy(dst, src, count, cudaMemcpyDeviceToDevice)); + CUDA_CHECK(cudaStreamSynchronize(0)); //LOG(INFO) << "cuda, sync, D2D, size: " << count; } else{ CUDA_CHECK(cudaMemcpyPeer(dst, dst_id, src, src_id, count)); @@ -247,6 +249,7 @@ void NV_API::async_memcpy(void* dst, int dst_id, const void* src, int src_id, \ void NV_API::sync_memcpy(void* dst, int dst_id, const void* src, int src_id, \ size_t count, __HtoD) { CUDA_CHECK(cudaMemcpy(dst, src, count, cudaMemcpyHostToDevice)); + CUDA_CHECK(cudaStreamSynchronize(0)); //LOG(INFO) << "cuda, sync, H2D, size: " << count; } @@ -260,6 +263,7 @@ void NV_API::async_memcpy(void* dst, int dst_id, const void* src, int src_id, \ void NV_API::sync_memcpy(void* dst, int dst_id, const void* src, int src_id, \ size_t count, __DtoH) { CUDA_CHECK(cudaMemcpy(dst, src, count, cudaMemcpyDeviceToHost)); + CUDA_CHECK(cudaStreamSynchronize(0)); //LOG(INFO) << "cuda, sync, D2H, size: " << count; } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index c36887ca3..93d7f2d0e 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -26,6 +26,7 @@ anakin_fetch_files_with_suffix(${ANAKIN_UNIT_TEST}/framework/graph "cpp" ANAKIN_ anakin_fetch_files_with_suffix(${ANAKIN_UNIT_TEST}/framework/operators "cpp" ANAKIN_TEST_CASE_SRC) anakin_fetch_files_with_suffix(${ANAKIN_UNIT_TEST}/framework/net "cpp" ANAKIN_TEST_CASE_SRC) anakin_fetch_files_with_suffix(${ANAKIN_UNIT_TEST}/framework/lite "cpp" ANAKIN_TEST_CASE_SRC) +anakin_fetch_files_with_suffix(${ANAKIN_UNIT_TEST}/framework/service "cpp" ANAKIN_TEST_CASE_SRC) if(NVIDIA_GPU) diff --git a/test/framework/net/net_exec_multi_thread_test.cpp b/test/framework/net/net_exec_multi_thread_test.cpp index 6704c4324..33b0f8e66 100644 --- a/test/framework/net/net_exec_multi_thread_test.cpp +++ b/test/framework/net/net_exec_multi_thread_test.cpp @@ -15,102 +15,98 @@ using Target_H = ARM; #endif //std::string model_path = "../benchmark/CNN/models/vgg16.anakin.bin"; // std::string model_path = "/home/public/model_from_fluid/beta/demoprogram.anakin2.bin"; -std::string model_path = "benchmark/CNN/mobilenet_v2.anakin.bin"; +//std::string model_path = "benchmark/CNN/mobilenet_v2.anakin.bin"; +std::string model_path = "/home/cuichaowen/anakin2/public_model/public-caffe-model/mobilenetv12/mobilenet_v2.anakin.bin"; #ifdef USE_CUDA #if 0 -TEST(NetTest, nv_net_execute_muti_thread_sync_test) { -#if 1 // use host input - //Env::env_init(1); - LOG(WARNING) << "Sync Runing multi_threads for model: " << model_path; - Worker workers(model_path, 10); - workers.register_inputs({"input_0"}); - workers.register_outputs({"softmax_out"}); - workers.Reshape("input_0", {1, 384, 960, 3}); +TEST(NetTest, net_execute_single_test) { + Graph* graph = new Graph(); + LOG(WARNING) << "load anakin model file from " << model_path << " ..."; + // load anakin model files. + auto status = graph->load(model_path); + if(!status ) { + LOG(FATAL) << " [ERROR] " << status.info(); + } - workers.launch(); + //anakin graph optimization + graph->Optimize(); - std::vector::type, AK_FLOAT> > host_tensor_p_in_list; - // get in - saber::Shape valid_shape_in({1, 384, 960, 3}); - Tensor4dPtr::type, AK_FLOAT> h_tensor_in = new Tensor4d::type, AK_FLOAT>(valid_shape_in); - float* h_data = h_tensor_in->mutable_data(); - for (int i=0; isize(); i++) { - h_data[i] = 1.0f; - } - host_tensor_p_in_list.push_back(h_tensor_in); + Net net_executer(*graph, true); + + int epoch = 2; + // do inference + Context ctx(0, 0, 0); + saber::SaberTimer my_time; + LOG(WARNING) << "EXECUTER !!!!!!!! "; + + my_time.start(ctx); - int epoch = 1000; - // Running for(int i=0; i h_tensor_in; - // get the output - auto d_tensor_p = d_tensor_p_out_list[0]; - } + auto valid_shape_in = d_tensor_in_p->valid_shape(); + for (int i=0; ifirst << " processing " << it->second.size() << " tasks"; - for (auto time_in_ms : it->second) { - LOG(INFO) << " \\__task avg time: " << time_in_ms; - } - } -#endif + h_tensor_in.re_alloc(valid_shape_in); + float* h_data = h_tensor_in.mutable_data(); -#endif + for (int i=0; icopy_from(h_tensor_in); + + net_executer.prediction(); + + auto tensor_out_0_p = net_executer.get_out("prob_out"); + + test_print(tensor_out_0_p); + } +} +#endif -#if 0 // use device input - Env::env_init(1); +#if 1 +TEST(NetTest, net_execute_muti_thread_sync_test) { LOG(WARNING) << "Sync Runing multi_threads for model: " << model_path; - Worker workers(model_path, 1); + Worker workers(model_path, 3); workers.register_inputs({"input_0"}); - workers.register_outputs({"softmax_out"}); - workers.Reshape("input_0", {1, 384, 960, 3}); + workers.register_outputs({"prob_out"}); + workers.Reshape("input_0", {1, 3, 224, 224}); workers.launch(); - std::vector::type, AK_FLOAT> > host_tensor_p_in_list; + std::vector::type, AK_FLOAT> > host_tensor_in_list; // get in - saber::Shape valid_shape_in({1, 384, 960, 3}); - Tensor4dPtr::type, AK_FLOAT> h_tensor_in = new Tensor4d::type, AK_FLOAT>(valid_shape_in); - float* h_data = h_tensor_in->mutable_data(); - for (int i=0; isize(); i++) { - h_data[i] = 1.0f; - } - host_tensor_p_in_list.push_back(h_tensor_in); + saber::Shape valid_shape_in({1, 3, 224, 224}); + Tensor4d::type, AK_FLOAT> h_tensor_in(valid_shape_in); - std::vector > device_tensor_p_in_list; - for (int i=0; i d_tensor_in = new Tensor4d(host_tensor_p_in_list[i]->valid_shape()); - d_tensor_in->copy_from(*(host_tensor_p_in_list[i])); - device_tensor_p_in_list.push_back(d_tensor_in); + float* h_data = h_tensor_in.mutable_data(); + for (int i=0; i ctx(0, 0, 0); - saber::SaberTimer my_time; - - my_time.start(ctx); - auto d_tensor_p_out_list = workers.sync_prediction_device(device_tensor_p_in_list); - my_time.end(ctx); - LOG(INFO)<<"muti thread single task exec time: "< workers(model_path, 10); @@ -155,6 +151,7 @@ TEST(NetTest, net_execute_muti_thread_async_test) { check.join(); } #endif + #endif int main(int argc, const char** argv){ diff --git a/test/framework/net/net_exec_nv_rnn_oneinput.cpp b/test/framework/net/net_exec_nv_rnn_oneinput.cpp index ed4b8c1f3..7c351ebe2 100644 --- a/test/framework/net/net_exec_nv_rnn_oneinput.cpp +++ b/test/framework/net/net_exec_nv_rnn_oneinput.cpp @@ -370,7 +370,7 @@ void worker_run(){ for(int i=0; i::type, AK_FLOAT> > tensor_in_vec(1); // tensor_in_vec[0]=host_tensor_p_in_list[0][i]; - workers.sync_prediction(tensor_in_vec); + //workers.sync_prediction(tensor_in_vec); // auto d_tensor_p_out_list = workers.sync_prediction(host_tensor_p_in_list); // get the output @@ -458,4 +458,4 @@ int main(int argc, const char** argv) { return 0; } -#endif \ No newline at end of file +#endif diff --git a/test/framework/net/net_exec_test.cpp b/test/framework/net/net_exec_test.cpp index 81d1616fb..6e628c9cc 100644 --- a/test/framework/net/net_exec_test.cpp +++ b/test/framework/net/net_exec_test.cpp @@ -17,8 +17,9 @@ using Target_H = ARM; //#define USE_DIEPSE // vgg16 -//std::string model_path = "../benchmark/CNN/models/vgg16.anakin.bin"; -std::string model_path = "/home/public/model_from_fluid/beta/demoprogram.anakin2.bin"; +// std::string model_path = "../benchmark/CNN/models/vgg16.anakin.bin"; +// std::string model_path = "/home/public/model_from_fluid/beta/demoprogram.anakin2.bin"; +std::string model_path = "/home/cuichaowen/anakin2/public_model/public-caffe-model/mobilenetv12/mobilenet_v2.anakin.bin"; #ifdef USE_CUDA #if 1 @@ -54,7 +55,7 @@ TEST(NetTest, net_execute_base_test) { #endif // get in - auto d_tensor_in_p = net_executer.get_in("input_0"); + /*auto d_tensor_in_p = net_executer.get_in("input_0"); Tensor4d h_tensor_in; auto valid_shape_in = d_tensor_in_p->valid_shape(); @@ -69,7 +70,7 @@ TEST(NetTest, net_execute_base_test) { h_data[i] = 1.0f; } - d_tensor_in_p->copy_from(h_tensor_in); + d_tensor_in_p->copy_from(h_tensor_in);*/ #ifdef USE_DIEPSE // for diepse model @@ -106,7 +107,7 @@ TEST(NetTest, net_execute_base_test) { d_tensor_in_2_p->copy_from(h_tensor_in_2); #endif - int epoch = 1; + int epoch = 10; // do inference Context ctx(0, 0, 0); saber::SaberTimer my_time; @@ -122,7 +123,31 @@ TEST(NetTest, net_execute_base_test) { //auto start = std::chrono::system_clock::now(); for(int i=0; i h_tensor_in; + + auto valid_shape_in = d_tensor_in_p->valid_shape(); + for (int i=0; icopy_from(h_tensor_in); + + //cudaStreamSynchronize(0); + //cudaDeviceSynchronize(); net_executer.prediction(); + //cudaDeviceSynchronize(); + + auto tensor_out_0_p = net_executer.get_out("prob_out"); + test_print(tensor_out_0_p); } /* // running part of model net_executer.execute_stop_at_node("relu2_2/expand"); @@ -182,11 +207,14 @@ TEST(NetTest, net_execute_base_test) { //auto tensor_out_0_p = net_executer.get_out("detection_output_0.tmp_0_out"); + // mobilenet-v2 + //auto tensor_out_0_p = net_executer.get_out("prob_out"); + + // get out result //LOG(WARNING)<< "result avg: " << tensor_average(tensor_out_0_p); //test_print(tensor_out_0_p); - // save the optimized model to disk. std::string save_model_path = model_path + std::string(".saved"); status = graph->save(save_model_path); @@ -265,7 +293,6 @@ TEST(NetTest, net_execute_reconstruction_test) { int main(int argc, const char** argv){ - Env::env_init(); // initial logger logger::init(argv[0]); InitTest(); diff --git a/test/framework/net/net_test.h b/test/framework/net/net_test.h index 4c1f6a51b..6c2631481 100644 --- a/test/framework/net/net_test.h +++ b/test/framework/net/net_test.h @@ -50,7 +50,7 @@ void test_print(Tensor4dPtr& out_tensor_p) { h_tensor_result.re_alloc(out_tensor_p->valid_shape()); LOG(ERROR) << "result count : " << h_tensor_result.valid_shape().count(); h_tensor_result.copy_from(*out_tensor_p); - for (int i = 0; i < h_tensor_result.valid_size(); i++) { + for (int i = 0; i < /*h_tensor_result.valid_size()*/10; i++) { LOG(INFO) << " GET OUT (" << i << ") " << h_tensor_result.mutable_data()[i]; } } diff --git a/test/framework/service/service_rpc_client.cpp b/test/framework/service/service_rpc_client.cpp new file mode 100644 index 000000000..c6ee146af --- /dev/null +++ b/test/framework/service/service_rpc_client.cpp @@ -0,0 +1,117 @@ +#include +#include "service_test.h" +#include "saber/funcs/timer.h" +#include +#include + +#if defined(USE_CUDA) +using Target = NV; +using Target_H = X86; +#elif defined(USE_X86_PLACE) +using Target = X86; +using Target_H = X86; +#elif defined(USE_ARM_PLACE) +using Target = ARM; +using Target_H = ARM; +#endif + +std::string protocol = "baidu_std"; +std::string server = "0.0.0.0:8000"; + +void fill_request(int id, RPCRequest& request) { + request.set_model("mobilenet_v2"); + request.set_request_id(id); + int batch_size = 1; + IO* input = request.add_inputs(); + Data* data = input->mutable_tensor(); + data->add_shape(batch_size); + data->add_shape(3); + data->add_shape(224); + data->add_shape(224); + float new_tmp_data[batch_size*3*224*224]; + for(int i=0; i < batch_size*3*224*224; i++) { + new_tmp_data[i] = 1.0f; + } + data->mutable_data()->Reserve(batch_size*3*224*224); + LOG(WARNING) << " client data_p: " << data->mutable_data()->mutable_data() << " o: " + << request.inputs(0).tensor().data().data(); + //memcpy(data->mutable_data()->mutable_data(), new_tmp_data, 3*224*224*sizeof(float)); + for(int i=0; i "<< cntl.ErrorText() < +#include "service_test.h" +#include "saber/funcs/timer.h" +#include + +#if defined(USE_CUDA) +using Target = NV; +using Target_H = X86; +#elif defined(USE_X86_PLACE) +using Target = X86; +using Target_H = X86; +#elif defined(USE_ARM_PLACE) +using Target = ARM; +using Target_H = ARM; +#endif + +std::string mobilenet_v2_model_path = "/home/cuichaowen/anakin2/public_model/public-caffe-model/mobilenetv12/mobilenet_v2.anakin.bin"; +int batchsize = 1; + +int service_start(int port, int dev_id) { + // create one server + brpc::Server server; + + // instance anakin rpc service + AnakinService rpc_service; + // device id must be set + rpc_service.set_device_id(dev_id); + + // initialize config for mobilenet v2 + rpc_service.initial("mobilenet_v2", mobilenet_v2_model_path, 3); + rpc_service.register_inputs("mobilenet_v2", {"input_0"}); + rpc_service.Reshape("mobilenet_v2", "input_0", {batchsize, 3, 224, 224}); + rpc_service.register_outputs("mobilenet_v2", {"prob_out"}); + + // create moniter for this service + rpc_service.create_monitor(30); // span 30 second + + + // launch rpc service + rpc_service.launch(); + + // add service to server + if (server.AddService(&rpc_service, brpc::SERVER_DOESNT_OWN_SERVICE) != 0) { + LOG(ERROR) << "Fail to add service"; + return -1; + } + + // Start the server + brpc::ServerOptions options; + // Connection will be closed if there is no read/write operations during the time(s) + options.idle_timeout_sec = 600; + // Max number thread of server + options.num_threads = 10; + // Max concurrency request of server + options.max_concurrency = 300; + + if (server.Start(port, &options) != 0) { + LOG(ERROR) << "Fail to start Server on port "<< port << " device id " << dev_id; + return -1; + } + + // Wait until Ctrl-C is pressed, then Stop() and Join() the server + server.RunUntilAskedToQuit(); + + // server is stopped, you can release the source here + return 0; +} + +TEST(ServiceTest, Service_base_test) { + // create anakin service deamon instance + ServiceDaemon daemon_rpc; + // launch daemon service for rpc [ on device 0 and port 8000] + daemon_rpc(service_start, {0}, 8000); +} + +int main(int argc, const char** argv){ + // initial logger + logger::init(argv[0]); + InitTest(); + RUN_ALL_TESTS(argv[0]); + return 0; +} diff --git a/test/framework/service/service_test.h b/test/framework/service/service_test.h new file mode 100644 index 000000000..b4a037368 --- /dev/null +++ b/test/framework/service/service_test.h @@ -0,0 +1,50 @@ +/* Copyright (c) 2018 Anakin Authors, Inc. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef ANAKIN_SERVICE_TEST_H +#define ANAKIN_SERVICE_TEST_H + +#include +#include "utils/unit_test/aktest.h" +#include "utils/logger/logger.h" +#include "graph_base.h" +#include "graph.h" +#include "scheduler.h" +#include "net.h" +#include "worker.h" +#include "service_daemon.h" + +using namespace anakin; +using ::anakin::test::Test; + +using namespace anakin::rpc; + +/** + * \brief anakin service test is base Test class for anakin rpc service + */ +class ServiceTest: public Test { +public: + ServiceTest(){} + + void SetUp(){} + + void TearDown(){} + +protected: +}; + +#endif + +