1616
1717NCCL_PARAM (RetryCnt, " SOCKET_RETRY_CNT" , 34 );
1818NCCL_PARAM (RetryTimeOut, " SOCKET_RETRY_SLEEP_MSEC" , 100 );
19+ NCCL_PARAM (PollTimeOut, " SOCKET_POLL_TIMEOUT_MSEC" , 0 );
20+
1921static void msleep (unsigned int time_msec) {
2022 const long c_1e6 = 1e6 ;
2123 struct timespec tv = (struct timespec ){
@@ -25,6 +27,14 @@ static void msleep(unsigned int time_msec) {
2527 nanosleep (&tv, NULL );
2628}
2729
30+ static void pollSocket (int fd, int op) {
31+ struct pollfd pfd;
32+ pfd.fd = fd;
33+ pfd.events = (op == NCCL_SOCKET_RECV) ? POLLIN : POLLOUT;
34+ pfd.revents = 0 ;
35+ poll (&pfd, 1 , ncclParamPollTimeOut ());
36+ }
37+
2838static ncclResult_t socketProgressOpt (int op, struct ncclSocket * sock, void * ptr, int size, int * offset, int block, int * closed) {
2939 int bytes = 0 ;
3040 *closed = 0 ;
@@ -77,8 +87,12 @@ static ncclResult_t socketProgress(int op, struct ncclSocket* sock, void* ptr, i
7787}
7888
7989static ncclResult_t socketWait (int op, struct ncclSocket * sock, void * ptr, int size, int * offset) {
80- while (*offset < size)
90+ while (*offset < size) {
8191 NCCLCHECK (socketProgress (op, sock, ptr, size, offset));
92+ // If we have more data to read or write, use the poll system call to wait
93+ // until the socket becomes readable or writable again.
94+ if ((*offset < size) && ncclParamPollTimeOut ()) pollSocket (sock->fd , op);
95+ }
8296 return ncclSuccess;
8397}
8498
0 commit comments