Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 45 additions & 13 deletions protos/Crane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ message ExecuteTasksReply {
repeated uint32 failed_task_id_list = 1;
}

message ExecuteProcRequest{
ProcToD proc = 1;
}

message ExecuteProcReply{
bool ok = 1;
}

message CreateCgroupForTasksRequest {
repeated uint32 task_id_list = 1;
repeated uint32 uid_list = 2;
Expand Down Expand Up @@ -444,13 +452,16 @@ message StreamCforedRequest {
string cfored_name = 1;
int32 pid = 2;
TaskToCtld task = 3;
// for nested task of crun
optional uint32 task_id = 4;
}

message TaskCompleteReq {
string cfored_name = 1;
uint32 task_id = 2;
TaskStatus status = 3;
InteractiveTaskType interactive_type = 4;
uint32 proc_id = 3;
TaskStatus status = 4;
InteractiveTaskType interactive_type = 5;
}

message GracefulExitReq {
Expand Down Expand Up @@ -479,7 +490,12 @@ message StreamCtldReply {
int32 pid = 1;
bool ok = 2;
uint32 task_id = 3;
string failure_reason = 4;
uint32 proc_id = 4;
string failure_reason = 5;
message NestedTaskNodes{
repeated string craned_ids = 1;
}
optional NestedTaskNodes nodes = 6;
}

message TaskResAllocatedReply {
Expand All @@ -492,6 +508,7 @@ message StreamCtldReply {

message TaskCancelRequest {
uint32 task_id = 1;
uint32 proc_id = 2;
}

message TaskCompletionAckReply {
Expand Down Expand Up @@ -529,16 +546,20 @@ message StreamCrunRequest{
message TaskReq {
int32 crun_pid = 1;
TaskToCtld task = 2;
// for nested task of crun
optional uint32 task_id = 3;
}

message TaskCompleteReq {
uint32 task_id = 1;
TaskStatus status = 2;
uint32 proc_id = 2;
TaskStatus status = 3;
}

message TaskIOForwardReq {
uint32 task_id = 1;
string msg = 2;
uint32 proc_id = 2;
string msg = 3;
}

CrunRequestType type = 1;
Expand All @@ -558,12 +579,14 @@ message StreamCforedCrunReply {
TASK_COMPLETION_ACK_REPLY = 3;
TASK_IO_FORWARD = 4;
TASK_IO_FORWARD_READY = 5;
Proc_FORWARD_END = 6;
}

message TaskIdReply {
bool ok = 1;
uint32 task_id = 2;
string failure_reason = 3;
uint32 proc_id = 3;
string failure_reason = 4;
}

message TaskResAllocatedReply {
Expand All @@ -587,6 +610,10 @@ message StreamCforedCrunReply {
string msg = 1;
}

message ProcForwardEndReply{
bool ok = 1;
}

CforedCrunReplyType type = 1 ;

oneof payload {
Expand All @@ -596,23 +623,26 @@ message StreamCforedCrunReply {
TaskCompletionAckReply payload_task_completion_ack_reply = 5;
TaskIOForwardReadyReply payload_task_io_forward_ready_reply = 6;
TaskIOForwardReply payload_task_io_forward_reply = 7;
ProcForwardEndReply payload_proc_forward_end_reply = 8;
}
}

message StreamCforedTaskIORequest {
enum CranedRequestType{
CRANED_REGISTER = 0;
CRANED_TASK_OUTPUT = 1;
CRANED_PROC_OUTPUT = 1;
CRANED_UNREGISTER = 2;
}

message CranedRegisterReq {
string craned_id = 1;
}

message CranedTaskOutputReq {
message CranedProcOutputReq {
uint32 task_id = 1;
string msg = 2;
uint32 proc_id = 2;
string msg = 3;
bool end = 4;
}

message CranedUnRegisterReq {
Expand All @@ -622,7 +652,7 @@ message StreamCforedTaskIORequest {
CranedRequestType type = 1;
oneof payload {
CranedRegisterReq payload_register_req = 2;
CranedTaskOutputReq payload_task_output_req = 3;
CranedProcOutputReq payload_proc_output_req = 3;
CranedUnRegisterReq payload_unregister_req = 4;
}
}
Expand All @@ -638,9 +668,10 @@ message StreamCforedTaskIOReply {
bool ok = 1;
}

message CranedTaskInputReq {
message CranedProcInputReq {
uint32 task_id = 1;
string msg = 2;
uint32 proc_id = 2;
string msg = 3;
}

message CranedUnregisterReply {
Expand All @@ -651,7 +682,7 @@ message StreamCforedTaskIOReply {

oneof payload {
CranedRegisterReply payload_craned_register_reply = 2;
CranedTaskInputReq payload_task_input_req = 3;
CranedProcInputReq payload_proc_input_req = 3;
CranedUnregisterReply payload_craned_unregister_reply = 4;
}
}
Expand Down Expand Up @@ -701,6 +732,7 @@ service CraneCtld {
service Craned {
/* ----------------------------------- Called from CraneCtld ---------------------------------------------------- */
rpc ExecuteTask(ExecuteTasksRequest) returns(ExecuteTasksReply);
rpc ExecuteProc(ExecuteProcRequest) returns(ExecuteProcReply);

rpc CheckTaskStatus(CheckTaskStatusRequest) returns(CheckTaskStatusReply);

Expand Down
14 changes: 14 additions & 0 deletions protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,20 @@ enum InteractiveTaskType {
Crun = 1;
}

message ProcToD{
uint32 task_id = 1;
uint32 proc_id = 2;
TaskType type = 3;
bool get_user_env = 4;
map<string, string> env = 5;
string cwd = 6;
oneof payload {
BatchTaskAdditionalMeta batch_meta = 7;
InteractiveTaskAdditionalMeta interactive_meta = 8;
}

}

message TaskToCtld {
/* -------- Fields that are set at the submission time. ------- */
google.protobuf.Duration time_limit = 1;
Expand Down
54 changes: 53 additions & 1 deletion src/CraneCtld/CranedKeeper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@ std::vector<task_id_t> CranedStub::ExecuteTasks(
return failed_task_ids;
}

CraneErr CranedStub::ExecuteProc(
const crane::grpc::ExecuteProcRequest &request) {
using crane::grpc::ExecuteProcReply;

ClientContext context;
Status status;
ExecuteProcReply reply;

status = m_stub_->ExecuteProc(&context,request,&reply);
if (!status.ok()) {
CRANE_DEBUG(
"ExecuteProc RPC for Node {} returned with status not ok: {}",
m_craned_id_, status.error_message());
return CraneErr::kRpcFailure;
}
return CraneErr::kOk;
}

CraneErr CranedStub::TerminateTasks(const std::vector<task_id_t> &task_ids) {
using crane::grpc::TerminateTasksReply;
using crane::grpc::TerminateTasksRequest;
Expand All @@ -81,7 +99,10 @@ CraneErr CranedStub::TerminateTasks(const std::vector<task_id_t> &task_ids) {
return CraneErr::kRpcFailure;
}

return CraneErr::kOk;
if (reply.ok())
return CraneErr::kOk;
else
return CraneErr::kGenericFailure;
}

CraneErr CranedStub::TerminateOrphanedTask(task_id_t task_id) {
Expand Down Expand Up @@ -295,6 +316,37 @@ crane::grpc::ExecuteTasksRequest CranedStub::NewExecuteTasksRequest(
return request;
}

// only for Proc launching
crane::grpc::ExecuteProcRequest CranedStub::NewExecuteProcRequest(
std::unique_ptr<TaskInCtld>& task_running,
std::unique_ptr<TaskInCtld>& step_submit) {
crane::grpc::ExecuteProcRequest request;
auto proc_to_d = request.mutable_proc();
// Set type
proc_to_d->set_type(step_submit->type);
proc_to_d->set_task_id(task_running->TaskId());
proc_to_d->mutable_env()->insert(step_submit->env.begin(), step_submit->env.end());

proc_to_d->set_cwd(step_submit->cwd);
proc_to_d->set_get_user_env(step_submit->get_user_env);

if (step_submit->type == crane::grpc::Batch) {
auto &meta_in_ctld = std::get<BatchMetaInTask>(step_submit->meta);
auto *mutable_meta = proc_to_d->mutable_batch_meta();
mutable_meta->set_output_file_pattern(meta_in_ctld.output_file_pattern);
mutable_meta->set_error_file_pattern(meta_in_ctld.error_file_pattern);
mutable_meta->set_sh_script(meta_in_ctld.sh_script);
} else {
auto &meta_in_ctld = std::get<InteractiveMetaInTask>(step_submit->meta);
auto *mutable_meta = proc_to_d->mutable_interactive_meta();
mutable_meta->set_cfored_name(meta_in_ctld.cfored_name);
mutable_meta->set_sh_script(meta_in_ctld.sh_script);
mutable_meta->set_term_env(meta_in_ctld.term_env);
mutable_meta->set_interactive_type(meta_in_ctld.interactive_type);
}
return request;
}

CranedKeeper::CranedKeeper(uint32_t node_num) : m_cq_closed_(false) {
m_pmr_pool_res_ = std::make_unique<std::pmr::synchronized_pool_resource>();
m_tag_sync_allocator_ =
Expand Down
5 changes: 5 additions & 0 deletions src/CraneCtld/CranedKeeper.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ class CranedStub {
static crane::grpc::ExecuteTasksRequest NewExecuteTasksRequest(
const std::vector<TaskInCtld *> &tasks);

static crane::grpc::ExecuteProcRequest NewExecuteProcRequest(
std::unique_ptr<TaskInCtld>& task_running,
std::unique_ptr<TaskInCtld>& step_submit);

std::vector<task_id_t> ExecuteTasks(
const crane::grpc::ExecuteTasksRequest &request);
CraneErr ExecuteProc(const crane::grpc::ExecuteProcRequest &request);

CraneErr CreateCgroupForTasks(std::vector<CgroupSpec> const &cgroup_specs);

Expand Down
30 changes: 24 additions & 6 deletions src/CraneCtld/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,23 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
auto const &payload = cfored_request.payload_task_req();
auto task = std::make_unique<TaskInCtld>();
task->SetFieldsByTaskToCtld(payload.task());
if (payload.has_task_id()) {
auto result = g_task_scheduler->SubmitProc(
std::move(task), payload.task_id(), payload.pid());
const auto &[proc_id, craned_ids] = result.get();
ok = stream_writer->WriteTaskIdReply(
payload.pid(),
result::result<task_id_t, std::string>{payload.task_id()},
proc_id, craned_ids);
if (!ok) {
CRANE_ERROR(
"Failed to send msg to cfored {}. Connection is broken. "
"Exiting...",
cfored_name);
state = StreamState::kCleanData;
}
break;
}

auto &meta = std::get<InteractiveMetaInTask>(task->meta);
auto i_type = meta.interactive_type;
Expand All @@ -1016,11 +1033,12 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
writer->WriteTaskCancelRequest(task_id);
};

meta.cb_task_completed = [this, i_type, cfored_name,
writer_weak_ptr](task_id_t task_id) {
CRANE_TRACE("Sending TaskCompletionAckReply in task_completed",
task_id);
if (auto writer = writer_weak_ptr.lock(); writer)
meta.cb_task_completed = [&, cfored_name](task_id_t task_id) {
// calloc will not send TaskCompletionAckReply when task
// Complete.
// crun task will send TaskStatusChange from Craned,
if (auto writer = writer_weak_ptr.lock();
writer && meta.interactive_type == crane::grpc::Calloc)
writer->WriteTaskCompletionAckReply(task_id);
m_ctld_server_->m_mtx_.Lock();

Expand All @@ -1045,7 +1063,7 @@ grpc::Status CraneCtldServiceImpl::CforedStream(
} else {
result = result::fail(submit_result.error());
}
ok = stream_writer->WriteTaskIdReply(payload.pid(), result);
ok = stream_writer->WriteTaskIdReply(payload.pid(), result, 0, {});

if (!ok) {
CRANE_ERROR(
Expand Down
28 changes: 20 additions & 8 deletions src/CraneCtld/CtldGrpcServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ class CforedStreamWriter {
crane::grpc::StreamCforedRequest> *stream)
: m_stream_(stream), m_valid_(true) {}

bool WriteTaskIdReply(
pid_t calloc_pid,
result::result<task_id_t, std::string> res) {
bool WriteTaskIdReply(pid_t calloc_pid,
result::result<task_id_t, std::string> res,
proc_id_t proc_id,
const std::list<std::string> &craned_ids) {
LockGuard guard(&m_stream_mtx_);
if (!m_valid_) return false;

Expand All @@ -55,6 +56,10 @@ class CforedStreamWriter {
task_id_reply->set_ok(true);
task_id_reply->set_pid(calloc_pid);
task_id_reply->set_task_id(res.value());
task_id_reply->set_proc_id(proc_id);
if (!craned_ids.empty())
task_id_reply->mutable_nodes()->mutable_craned_ids()->Add(
craned_ids.begin(), craned_ids.end());
} else {
task_id_reply->set_ok(false);
task_id_reply->set_pid(calloc_pid);
Expand All @@ -64,8 +69,11 @@ class CforedStreamWriter {
return m_stream_->Write(reply);
}

bool WriteTaskResAllocReply(task_id_t task_id,
result::result<std::pair<std::string,std::list<std::string>>, std::string> res) {
bool WriteTaskResAllocReply(
task_id_t task_id,
result::result<std::pair<std::string, std::list<std::string>>,
std::string>
res) {
LockGuard guard(&m_stream_mtx_);
if (!m_valid_) return false;

Expand All @@ -76,8 +84,12 @@ class CforedStreamWriter {

if (res.has_value()) {
task_res_alloc_reply->set_ok(true);
task_res_alloc_reply->set_allocated_craned_regex(std::move(res.value().first));
std::ranges::for_each(res.value().second,[&task_res_alloc_reply](const auto& craned_id){task_res_alloc_reply->add_craned_ids(craned_id);});
task_res_alloc_reply->set_allocated_craned_regex(
std::move(res.value().first));
std::ranges::for_each(res.value().second,
[&task_res_alloc_reply](const auto &craned_id) {
task_res_alloc_reply->add_craned_ids(craned_id);
});
} else {
task_res_alloc_reply->set_ok(false);
task_res_alloc_reply->set_failure_reason(std::move(res.error()));
Expand All @@ -89,7 +101,7 @@ class CforedStreamWriter {
bool WriteTaskCompletionAckReply(task_id_t task_id) {
LockGuard guard(&m_stream_mtx_);
if (!m_valid_) return false;
CRANE_TRACE("Sending TaskCompletionAckReply to cfored of task id {}",task_id);

StreamCtldReply reply;
reply.set_type(StreamCtldReply::TASK_COMPLETION_ACK_REPLY);

Expand Down
Loading