Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/bootstrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ static ncclResult_t setFilesLimit() {
static ncclResult_t rootSend(union ncclSocketAddress* addr, uint64_t magic, union ringConnectInfo* info) {
ncclResult_t res = ncclSuccess;
struct ncclSocket sock;
NCCLCHECKGOTO(ncclSocketInit(&sock, addr, magic, ncclSocketTypeBootstrap), res, fail);
NCCLCHECKGOTO(ncclSocketInit(&sock, &bootstrapNetIfAddr, addr, magic, ncclSocketTypeBootstrap), res, fail);
NCCLCHECKGOTO(ncclSocketConnect(&sock), res, fail);
NCCLCHECKGOTO(socketSend(&sock, info, sizeof(union ringConnectInfo)), res, fail);
NCCLCHECK(ncclSocketClose(&sock));
Expand Down Expand Up @@ -381,7 +381,7 @@ ncclResult_t bootstrapCreateRoot(struct ncclBootstrapHandle* handle, bool idFrom
pthread_t thread;

NCCLCHECK(ncclCalloc(&listenSock, 1));
NCCLCHECKGOTO(ncclSocketInit(listenSock, &handle->addr, handle->magic, ncclSocketTypeBootstrap, NULL, 0), ret, fail);
NCCLCHECKGOTO(ncclSocketInit(listenSock, &handle->addr, NULL, handle->magic, ncclSocketTypeBootstrap, NULL, 0), ret, fail);
NCCLCHECKGOTO(ncclSocketListen(listenSock), ret, fail);
NCCLCHECKGOTO(ncclSocketGetAddr(listenSock, &handle->addr), ret, fail);

Expand Down Expand Up @@ -470,7 +470,7 @@ struct bootstrapState {
// helper functions
static ncclResult_t createListenSocket(struct ncclComm* comm, uint64_t magic, struct ncclSocket* socket, union ncclSocketAddress* addr,
ncclSocketType type) {
NCCLCHECK(ncclSocketInit(socket, &bootstrapNetIfAddr, magic, type, comm->abortFlag));
NCCLCHECK(ncclSocketInit(socket, &bootstrapNetIfAddr, NULL, magic, type, comm->abortFlag));
NCCLCHECK(ncclSocketListen(socket));
NCCLCHECK(ncclSocketGetAddr(socket, addr));
return ncclSuccess;
Expand Down Expand Up @@ -550,7 +550,7 @@ static ncclResult_t netRingConnect(ncclNet_t* net, struct bootstrapListen_t* lis
return ncclSuccess;
}
static ncclResult_t socketRingConnect(ncclSocketAddress* addr, struct ncclSocket* sendSocket, struct ncclSocket* listenSock, struct ncclSocket* recvSocket, uint64_t magic, volatile uint32_t* abortFlag) {
NCCLCHECK(ncclSocketInit(sendSocket, addr, magic, ncclSocketTypeBootstrap, abortFlag));
NCCLCHECK(ncclSocketInit(sendSocket, &bootstrapNetIfAddr, addr, magic, ncclSocketTypeBootstrap, abortFlag));
NCCLCHECK(ncclSocketConnect(sendSocket));
NCCLCHECK(ncclSocketInit(recvSocket));
NCCLCHECK(ncclSocketAccept(recvSocket, listenSock));
Expand Down Expand Up @@ -604,7 +604,7 @@ static ncclResult_t ringAllInfo(struct ncclComm* comm, struct bootstrapState* st
static ncclResult_t sendToRoot(struct ncclBootstrapHandle* handle, struct ncclComm* comm, struct extInfo* info) {
ncclResult_t ret = ncclSuccess;
struct ncclSocket sock;
NCCLCHECK(ncclSocketInit(&sock, &handle->addr, handle->magic, ncclSocketTypeBootstrap, comm->abortFlag));
NCCLCHECK(ncclSocketInit(&sock, &bootstrapNetIfAddr, &handle->addr, handle->magic, ncclSocketTypeBootstrap, comm->abortFlag));
NCCLCHECKGOTO(ncclSocketConnect(&sock), ret, fail);
NCCLCHECKGOTO(socketSend(&sock, info, sizeof(struct extInfo)), ret, fail);
NCCLCHECK(ncclSocketClose(&sock));
Expand Down Expand Up @@ -867,7 +867,7 @@ static ncclResult_t socketConnect(void* commState, int peer, int tag, struct ncc
struct bootstrapState* state = (struct bootstrapState*)commState;

struct socketAckInfo ack = (struct socketAckInfo){.rank = state->rank, .tag = tag};
NCCLCHECKGOTO(ncclSocketInit(sock, state->peerP2pAddresses + peer, state->magic, ncclSocketTypeBootstrap, state->abortFlag), ret, fail);
NCCLCHECKGOTO(ncclSocketInit(sock, &bootstrapNetIfAddr, state->peerP2pAddresses + peer, state->magic, ncclSocketTypeBootstrap, state->abortFlag), ret, fail);
NCCLCHECKGOTO(ncclSocketConnect(sock), ret, fail);
NCCLCHECKGOTO(socketSend(sock, &ack, sizeof(struct socketAckInfo)), ret, fail);
return ncclSuccess;
Expand Down
10 changes: 6 additions & 4 deletions src/include/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ struct ncclSocket {
int acceptFd;
int errorRetries;
union ncclSocketAddress addr;
union ncclSocketAddress peerAddr;
int family;
volatile uint32_t* abortFlag;
int asyncFlag;
enum ncclSocketState state;
Expand All @@ -75,15 +77,15 @@ ncclResult_t ncclFindInterfaces(char* ifNames, union ncclSocketAddress *ifAddrs,
int* nIfs);

// Initialize a socket
ncclResult_t ncclSocketInit(struct ncclSocket* sock, const union ncclSocketAddress* addr = NULL, uint64_t magic = NCCL_SOCKET_MAGIC, enum ncclSocketType type = ncclSocketTypeUnknown, volatile uint32_t* abortFlag = NULL, int asyncFlag = 0, int customRetry = 0);
ncclResult_t ncclSocketInit(struct ncclSocket* sock, const union ncclSocketAddress* addr = NULL, const union ncclSocketAddress* peerAddr = NULL, uint64_t magic = NCCL_SOCKET_MAGIC, enum ncclSocketType type = ncclSocketTypeUnknown, volatile uint32_t* abortFlag = NULL, int asyncFlag = 0, int customRetry = 0);
// Create a listening socket. sock->addr can be pre-filled with IP & port info. sock->fd is set after a successful call
ncclResult_t ncclSocketListen(struct ncclSocket* sock);
ncclResult_t ncclSocketGetAddr(struct ncclSocket* sock, union ncclSocketAddress* addr);
// Connect to sock->addr. sock->fd is set after a successful call.
ncclResult_t ncclSocketGetAddr(struct ncclSocket* sock, union ncclSocketAddress* addr, bool isPeer = false);
// Connect to sock->peerAddr. sock->fd is set after a successful call.
ncclResult_t ncclSocketConnect(struct ncclSocket* sock);
// Return socket connection state.
ncclResult_t ncclSocketReady(struct ncclSocket* sock, int *running);
// Accept an incoming connection from listenSock->fd and keep the file descriptor in sock->fd, with the remote side IP/port in sock->addr.
// Accept an incoming connection from listenSock->fd and keep the file descriptor in sock->fd, with the remote side IP/port in sock->peerAddr.
ncclResult_t ncclSocketAccept(struct ncclSocket* sock, struct ncclSocket* ulistenSock);
ncclResult_t ncclSocketGetFd(struct ncclSocket* sock, int* fd);
ncclResult_t ncclSocketSetFd(int fd, struct ncclSocket* sock);
Expand Down
67 changes: 49 additions & 18 deletions src/misc/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ static ncclResult_t socketProgressOpt(int op, struct ncclSocket* sock, void* ptr
}
if (errno != EINTR && errno != EWOULDBLOCK && errno != EAGAIN) {
WARN("socketProgressOpt: Call to %s %s failed : %s", (op == NCCL_SOCKET_RECV ? "recv from" : "send to"),
ncclSocketToString(&sock->addr, line), strerror(errno));
ncclSocketToString(&sock->peerAddr, line), strerror(errno));
return ncclRemoteError;
} else {
bytes = 0;
Expand All @@ -69,7 +69,7 @@ static ncclResult_t socketProgress(int op, struct ncclSocket* sock, void* ptr, i
} else {
char line[SOCKET_NAME_MAXLEN+1];
WARN("socketProgress: Connection closed by remote peer %s",
ncclSocketToString(&sock->addr, line, /*numericHostForm*/0));
ncclSocketToString(&sock->peerAddr, line, /*numericHostForm*/0));
return ncclRemoteError;
}
}
Expand Down Expand Up @@ -425,19 +425,22 @@ ncclResult_t ncclSocketListen(struct ncclSocket* sock) {
return ncclSuccess;
}

ncclResult_t ncclSocketGetAddr(struct ncclSocket* sock, union ncclSocketAddress* addr) {
ncclResult_t ncclSocketGetAddr(struct ncclSocket* sock, union ncclSocketAddress* addr, bool isPeer) {
if (sock == NULL) {
WARN("ncclSocketGetAddr: pass NULL socket");
return ncclInvalidArgument;
}
if (sock->state != ncclSocketStateReady) return ncclInternalError;
memcpy(addr, &sock->addr, sizeof(union ncclSocketAddress));
if (isPeer)
memcpy(addr, &sock->peerAddr, sizeof(union ncclSocketAddress));
else
memcpy(addr, &sock->addr, sizeof(union ncclSocketAddress));
return ncclSuccess;
}

static ncclResult_t socketTryAccept(struct ncclSocket* sock) {
socklen_t socklen = sizeof(union ncclSocketAddress);
sock->fd = accept(sock->acceptFd, (struct sockaddr*)&sock->addr, &socklen);
sock->fd = accept(sock->acceptFd, (struct sockaddr*)&sock->peerAddr, &socklen);
if (sock->fd != -1) {
sock->state = ncclSocketStateAccepted;
} else if (errno == ENETDOWN || errno == EPROTO || errno == ENOPROTOOPT || errno == EHOSTDOWN ||
Expand Down Expand Up @@ -545,7 +548,7 @@ static ncclResult_t socketFinalizeAccept(struct ncclSocket* sock) {
static ncclResult_t socketResetFd(struct ncclSocket* sock) {
ncclResult_t ret = ncclSuccess;
int fd = -1;
SYSCHECKGOTO(fd = socket(sock->addr.sa.sa_family, SOCK_STREAM, 0), "socket", ret, cleanup);
SYSCHECKGOTO(fd = socket(sock->family, SOCK_STREAM, 0), "socket", ret, cleanup);
// if sock->fd is valid, close it and reuse its number
if (sock->fd != -1) {
SYSCHECKGOTO(dup2(fd, sock->fd), "dup2", ret, cleanup);
Expand Down Expand Up @@ -589,15 +592,15 @@ static ncclResult_t socketConnectCheck(struct ncclSocket* sock, int errCode, con
sock->state = ncclSocketStateConnecting;
} else {
sock->state = ncclSocketStateError;
WARN("%s: connect to %s failed : %s", funcName, ncclSocketToString(&sock->addr, line), strerror(errCode));
WARN("%s: connect to %s failed : %s", funcName, ncclSocketToString(&sock->peerAddr, line), strerror(errCode));
return ncclSystemError;
}
return ncclSuccess;
}

static ncclResult_t socketStartConnect(struct ncclSocket* sock) {
/* blocking/non-blocking connect() is determined by asyncFlag. */
int ret = connect(sock->fd, &sock->addr.sa, sock->salen);
int ret = connect(sock->fd, &sock->peerAddr.sa, sock->salen);
return socketConnectCheck(sock, (ret == -1) ? errno : 0, __func__);
}

Expand Down Expand Up @@ -695,6 +698,7 @@ ncclResult_t ncclSocketReady(struct ncclSocket* sock, int *running) {
ncclResult_t ncclSocketConnect(struct ncclSocket* sock) {
#ifdef ENABLE_TRACE
char line[SOCKET_NAME_MAXLEN+1];
char linePeer[SOCKET_NAME_MAXLEN+1];
#endif

if (sock == NULL) {
Expand All @@ -711,7 +715,15 @@ ncclResult_t ncclSocketConnect(struct ncclSocket* sock) {
if (sock->state == ncclSocketStateError) return ncclRemoteError;
return ncclInternalError;
}
TRACE(NCCL_INIT|NCCL_NET,"Connecting to socket %s", ncclSocketToString(&sock->addr, line));
SYSCHECK(bind(sock->fd, &sock->addr.sa, sock->salen), "bind");

/* Get the assigned Port */
socklen_t size = sock->salen;
SYSCHECK(getsockname(sock->fd, &sock->addr.sa, &size), "getsockname");

#ifdef ENABLE_TRACE
TRACE(NCCL_INIT|NCCL_NET,"Connecting to socket local addr: %s, peer addr: %s", ncclSocketToString(&sock->addr, line), ncclSocketToString(&sock->peerAddr, linePeer));
#endif

sock->state = ncclSocketStateConnecting;
sock->finalizeCounter = 0;
Expand Down Expand Up @@ -791,8 +803,9 @@ ncclResult_t ncclSocketAccept(struct ncclSocket* sock, struct ncclSocket* listen
return ret;
}

ncclResult_t ncclSocketInit(struct ncclSocket* sock, const union ncclSocketAddress* addr, uint64_t magic, enum ncclSocketType type, volatile uint32_t* abortFlag, int asyncFlag, int customRetry) {
ncclResult_t ncclSocketInit(struct ncclSocket* sock, const union ncclSocketAddress* addr, const union ncclSocketAddress* peerAddr, uint64_t magic, enum ncclSocketType type, volatile uint32_t* abortFlag, int asyncFlag, int customRetry) {
ncclResult_t ret = ncclSuccess;
int family = -1;

if (sock == NULL) goto exit;
sock->errorRetries = 0;
Expand All @@ -804,24 +817,42 @@ ncclResult_t ncclSocketInit(struct ncclSocket* sock, const union ncclSocketAddre
sock->fd = -1;
sock->acceptFd = -1;
sock->customRetry = customRetry;
sock->family = -1;

if (addr) {
/* IPv4/IPv6 support */
int family;
memcpy(&sock->addr, addr, sizeof(union ncclSocketAddress));
family = sock->addr.sa.sa_family;
} else {
memset(&sock->addr, 0, sizeof(union ncclSocketAddress));
}
if (peerAddr) {
memcpy(&sock->peerAddr, peerAddr, sizeof(union ncclSocketAddress));
} else {
memset(&sock->peerAddr, 0, sizeof(union ncclSocketAddress));
}
if (addr && peerAddr) {
if (addr->sa.sa_family != peerAddr->sa.sa_family) {
WARN("ncclSocketInit: local address and peer address family should be the same");
ret = ncclInternalError;
goto exit;
}
family = addr->sa.sa_family;
} else if (addr) {
family = addr->sa.sa_family;
} else if (peerAddr) {
family = peerAddr->sa.sa_family;
}
if (addr || peerAddr) {
/* IPv4/IPv6 support */
if (family != AF_INET && family != AF_INET6) {
char line[SOCKET_NAME_MAXLEN+1];
WARN("ncclSocketInit: connecting to address %s with family %d is neither AF_INET(%d) nor AF_INET6(%d)",
ncclSocketToString(&sock->addr, line), family, AF_INET, AF_INET6);
WARN("ncclSocketInit: socket address family %d is neither AF_INET(%d) nor AF_INET6(%d)",
family, AF_INET, AF_INET6);
ret = ncclInternalError;
goto exit;
}
sock->family = family;
sock->salen = (family == AF_INET) ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
// in case of error, we close the fd before returning as it's unclear if the caller has to use ncclSocketClose for cleanup
NCCLCHECKGOTO(socketResetFd(sock), ret, fail);
} else {
memset(&sock->addr, 0, sizeof(union ncclSocketAddress));
}
exit:
return ret;
Expand Down
4 changes: 2 additions & 2 deletions src/proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,7 @@ ncclResult_t ncclProxyConnect(struct ncclComm* comm, int transport, int send, in
sock = sharedProxyState->peerSocks + proxyConn->tpLocalRank;
NCCLCHECK(ncclSocketReady(sock, &ready));
if (!ready) {
NCCLCHECK(ncclSocketInit(sock, sharedProxyState->peerAddresses+proxyConn->tpRank, comm->sharedRes->magic, ncclSocketTypeProxy, comm->abortFlag));
NCCLCHECK(ncclSocketInit(sock, NULL, sharedProxyState->peerAddresses+proxyConn->tpRank, comm->sharedRes->magic, ncclSocketTypeProxy, comm->abortFlag));
NCCLCHECK(ncclSocketConnect(sock));
}

Expand Down Expand Up @@ -1855,7 +1855,7 @@ ncclResult_t ncclProxyStop(struct ncclComm* comm) {
// We need to send a ncclProxyMsgStop message to our own proxy
struct ncclSocket sock;
int type = ncclProxyMsgStop;
NCCLCHECK(ncclSocketInit(&sock, sharedProxyState->peerAddresses + comm->topParentRanks[comm->rank], comm->sharedRes->magic, ncclSocketTypeProxy, comm->abortFlag));
NCCLCHECK(ncclSocketInit(&sock, NULL, sharedProxyState->peerAddresses + comm->topParentRanks[comm->rank], comm->sharedRes->magic, ncclSocketTypeProxy, comm->abortFlag));
if (ncclSocketConnect(&sock) == ncclSuccess) {
(void)ncclSocketSend(&sock, &type, sizeof(int));
}
Expand Down
2 changes: 1 addition & 1 deletion src/ras/collectives.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ ncclResult_t rasMsgHandleCollResp(struct rasMsg* msg, struct rasSocket* sock) {
if (coll == nullptr) {
INFO(NCCL_RAS, "RAS failed to find a matching ongoing collective for response %s:%ld from %s!",
ncclSocketToString(&msg->collResp.rootAddr, line), msg->collResp.rootId,
ncclSocketToString(&sock->sock.addr, rasLine));
ncclSocketToString(&sock->sock.peerAddr, rasLine));
goto exit;
}

Expand Down
2 changes: 1 addition & 1 deletion src/ras/peers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ ncclResult_t rasMsgHandlePeersUpdate(struct rasMsg* msg, struct rasSocket* sock)
bool updatePeers, updateDeadPeers;

INFO(NCCL_RAS, "RAS handling peersUpdate from %s (peersHash 0x%lx, deadPeersHash 0x%lx, nPeers %d, nDeadPeers %d)",
ncclSocketToString(&sock->sock.addr, rasLine), msg->peersUpdate.peersHash, msg->peersUpdate.deadPeersHash,
ncclSocketToString(&sock->sock.peerAddr, rasLine), msg->peersUpdate.peersHash, msg->peersUpdate.deadPeersHash,
msg->peersUpdate.nPeers, msg->peersUpdate.nDeadPeers);
INFO(NCCL_RAS, "RAS my old rasPeersHash 0x%lx, rasDeadPeersHash 0x%lx, nRasPeers %d, nRasDeadPeers %d",
rasPeersHash, rasDeadPeersHash, nRasPeers, nRasDeadPeers);
Expand Down
16 changes: 8 additions & 8 deletions src/ras/ras.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ ncclResult_t ncclRasCommInit(struct ncclComm* comm, struct rasRankInit* myRank)

memcpy(&addr, &myRank->addr, sizeof(addr));
(addr.sa.sa_family == AF_INET ? addr.sin.sin_port : addr.sin6.sin6_port) = htons(0);
NCCLCHECKGOTO(ncclSocketInit(&rasNetListeningSocket, &addr, NCCL_SOCKET_MAGIC, ncclSocketTypeRasNetwork,
NCCLCHECKGOTO(ncclSocketInit(&rasNetListeningSocket, &addr, NULL, NCCL_SOCKET_MAGIC, ncclSocketTypeRasNetwork,
/*abortFlag*/nullptr, /*asyncFlag*/1), ret, fail);
NCCLCHECKGOTO(ncclSocketListen(&rasNetListeningSocket), ret, fail);
INFO(NCCL_RAS, "RAS network listening socket at %s",
Expand Down Expand Up @@ -405,7 +405,7 @@ ncclResult_t rasMsgHandle(struct rasMsg* msg, struct rasSocket* sock) {
} else if (msg->type == RAS_MSG_COLLRESP) {
NCCLCHECK(rasMsgHandleCollResp(msg, sock));
} else {
WARN("RAS received unknown message type (%d) from %s", msg->type, ncclSocketToString(&sock->sock.addr, rasLine));
WARN("RAS received unknown message type (%d) from %s", msg->type, ncclSocketToString(&sock->sock.peerAddr, rasLine));
return ncclInternalError;
}

Expand All @@ -422,13 +422,13 @@ static ncclResult_t rasMsgHandleConnInit(const struct rasMsg* msg, struct rasSoc
char line[SOCKET_NAME_MAXLEN+1];

INFO(NCCL_RAS, "RAS handling connInit from %s (version %d, listeningAddr %s, peersHash 0x%lx, deadPeersHash 0x%lx)",
ncclSocketToString(&sock->sock.addr, rasLine), msg->connInit.ncclVersion,
ncclSocketToString(&sock->sock.peerAddr, rasLine), msg->connInit.ncclVersion,
ncclSocketToString(&msg->connInit.listeningAddr, line), msg->connInit.peersHash, msg->connInit.deadPeersHash);

if (msg->connInit.ncclVersion != NCCL_VERSION_CODE) {
// Close any such sockets immediately! This is basically unrecoverable...
WARN("NCCL version mismatch with remote peer %s (local: %d, remote %d)",
ncclSocketToString(&sock->sock.addr, rasLine), NCCL_VERSION_CODE, msg->connInit.ncclVersion);
ncclSocketToString(&sock->sock.peerAddr, rasLine), NCCL_VERSION_CODE, msg->connInit.ncclVersion);
rasNetSendNack(sock);
rasSocketTerminate(sock, /*finalize*/true);
ret = ncclInvalidUsage;
Expand Down Expand Up @@ -482,7 +482,7 @@ static ncclResult_t rasMsgHandleConnInit(const struct rasMsg* msg, struct rasSoc

conn->sock = sock;
sock->conn = conn;
memcpy(&sock->sock.addr, &msg->connInit.listeningAddr, sizeof(sock->sock.addr));
memcpy(&sock->sock.peerAddr, &msg->connInit.listeningAddr, sizeof(sock->sock.peerAddr));

// Make sure that the connection is part of the right links forming the RAS network. At this point we only
// update the expected (non-external) connections; external ones will be added during keep-alive handling.
Expand Down Expand Up @@ -518,13 +518,13 @@ static ncclResult_t rasMsgHandleConnInit(const struct rasMsg* msg, struct rasSoc
// Handles the second message sent over a RAS socket as part of the handshake.
static ncclResult_t rasMsgHandleConnInitAck(const struct rasMsg* msg, struct rasSocket* sock) {
INFO(NCCL_RAS, "RAS handling connInitAck from %s (nack %d)",
ncclSocketToString(&sock->sock.addr, rasLine), msg->connInitAck.nack);
ncclSocketToString(&sock->sock.peerAddr, rasLine), msg->connInitAck.nack);

if (msg->connInitAck.nack) {
// The remote peer doesn't want to talk to us. The easiest way to prevent it is by declaring it dead.
// We make a copy of the address because rasConnDisconnect will terminate the rasSocket.
union ncclSocketAddress addr;
memcpy(&addr, &sock->sock.addr, sizeof(addr));
memcpy(&addr, &sock->sock.peerAddr, sizeof(addr));
rasConnDisconnect(&addr);
(void)rasPeerDeclareDead(&addr);

Expand Down Expand Up @@ -563,7 +563,7 @@ static ncclResult_t rasNetSendNack(struct rasSocket* sock) {
int closed = 0;
int offset;

INFO(NCCL_RAS, "RAS sending NACK to %s", ncclSocketToString(&sock->sock.addr, rasLine));
INFO(NCCL_RAS, "RAS sending NACK to %s", ncclSocketToString(&sock->sock.peerAddr, rasLine));

memset(&msg, '\0', sizeof(msg));
msg.type = RAS_MSG_CONNINITACK;
Expand Down
Loading