Skip to content
This repository was archived by the owner on Dec 20, 2022. It is now read-only.

Commit df827b0

Browse files
authored
Merge pull request #15 from Mellanox/release-3.1
Release 3.1
2 parents 8476067 + 4545823 commit df827b0

File tree

12 files changed

+128
-163
lines changed

12 files changed

+128
-163
lines changed

README.md

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ Mellanox ConnectX-5 network adapter with 100GbE RoCE fabric, connected with a Me
3434
For more information on configuration, performance tuning and troubleshooting, please visit the [SparkRDMA GitHub Wiki](https://github.com/Mellanox/SparkRDMA/wiki)
3535

3636
## Runtime requirements
37-
* Apache Spark 2.0.0/2.1.0/2.2.0/2.3.0
37+
* Apache Spark 2.0.0/2.1.0/2.2.0/2.3.0/2.4.0
3838
* Java 8
3939
* An RDMA-supported network, e.g. RoCE or Infiniband
4040

@@ -45,21 +45,22 @@ Please use the ["Releases"](https://github.com/Mellanox/SparkRDMA/releases) page
4545
<br>If you would like to build the project yourself, please refer to the ["Build"](https://github.com/Mellanox/SparkRDMA#build) section below.
4646

4747
The pre-built binaries are packed as an archive that contains the following files:
48-
* spark-rdma-3.0-for-spark-2.0.0-jar-with-dependencies.jar
49-
* spark-rdma-3.0-for-spark-2.1.0-jar-with-dependencies.jar
50-
* spark-rdma-3.0-for-spark-2.2.0-jar-with-dependencies.jar
51-
* spark-rdma-3.0-for-spark-2.3.0-jar-with-dependencies.jar
48+
* spark-rdma-3.1-for-spark-2.0.0-jar-with-dependencies.jar
49+
* spark-rdma-3.1-for-spark-2.1.0-jar-with-dependencies.jar
50+
* spark-rdma-3.1-for-spark-2.2.0-jar-with-dependencies.jar
51+
* spark-rdma-3.1-for-spark-2.3.0-jar-with-dependencies.jar
52+
* spark-rdma-3.1-for-spark-2.4.0-jar-with-dependencies.jar
5253
* libdisni.so
5354

5455
libdisni.so **must** be in `java.library.path` on every Spark Master and Worker (usually in /usr/lib)
5556

5657
### Configuration
5758

5859
Provide Spark the location of the SparkRDMA plugin jars by using the extraClassPath option. For standalone mode this can
59-
be added to either spark-defaults.conf or any runtime configuration file. For client mode this **must** be added to spark-defaults.conf. For Spark 2.0.0 (Replace with 2.1.0, 2.2.0 or 2.3.0 according to your Spark version):
60+
be added to either spark-defaults.conf or any runtime configuration file. For client mode this **must** be added to spark-defaults.conf. For Spark 2.0.0 (Replace with 2.1.0, 2.2.0, 2.3.0, 2.4.0 according to your Spark version):
6061
```
61-
spark.driver.extraClassPath /path/to/SparkRDMA/target/spark-rdma-2.0-for-spark-2.0.0-jar-with-dependencies.jar
62-
spark.executor.extraClassPath /path/to/SparkRDMA/target/spark-rdma-2.0-for-spark-2.0.0-jar-with-dependencies.jar
62+
spark.driver.extraClassPath /path/to/SparkRDMA/target/spark-rdma-3.1-for-spark-2.0.0-jar-with-dependencies.jar
63+
spark.executor.extraClassPath /path/to/SparkRDMA/target/spark-rdma-3.1-for-spark-2.0.0-jar-with-dependencies.jar
6364
```
6465

6566
### Running
@@ -76,7 +77,7 @@ Building the SparkRDMA plugin requires [Apache Maven](http://maven.apache.org/)
7677

7778
1. Obtain a clone of [SparkRDMA](https://github.com/Mellanox/SparkRDMA)
7879

79-
2. Build the plugin for your Spark version (either 2.0.0, 2.1.0, 2.2.0 or 2.3.0), e.g. for Spark 2.0.0:
80+
2. Build the plugin for your Spark version (either 2.0.0, 2.1.0, 2.2.0, 2.3.0, 2.4.0), e.g. for Spark 2.0.0:
8081
```
8182
mvn -DskipTests clean package -Pspark-2.0.0
8283
```

pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
<groupId>com.github.mellanox</groupId>
99
<artifactId>spark-rdma</artifactId>
10-
<version>3.0</version>
10+
<version>3.1</version>
1111
<name>${project.artifactId}</name>
1212
<description>SparkRDMA Shuffle Manager Plugin</description>
1313
<inceptionYear>2017</inceptionYear>
@@ -61,6 +61,12 @@
6161
<spark.version>2.3.0</spark.version>
6262
</properties>
6363
</profile>
64+
<profile>
65+
<id>spark-2.4.0</id>
66+
<properties>
67+
<spark.version>2.4.0</spark.version>
68+
</properties>
69+
</profile>
6470
</profiles>
6571

6672
<dependencies>

src/main/java/org/apache/spark/shuffle/rdma/RdmaBuffer.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,18 @@ class RdmaBuffer {
4040
private final MemoryBlock block;
4141
private AtomicInteger refCount;
4242

43-
public static final UnsafeMemoryAllocator unsafeAlloc = new UnsafeMemoryAllocator();
43+
static final UnsafeMemoryAllocator unsafeAlloc = new UnsafeMemoryAllocator();
44+
public static final Constructor<?> directBufferConstructor;
45+
46+
static {
47+
try {
48+
Class<?> classDirectByteBuffer = Class.forName("java.nio.DirectByteBuffer");
49+
directBufferConstructor = classDirectByteBuffer.getDeclaredConstructor(long.class, int.class);
50+
directBufferConstructor.setAccessible(true);
51+
} catch (Exception e) {
52+
throw new RuntimeException("java.nio.DirectByteBuffer class not found");
53+
}
54+
}
4455

4556
RdmaBuffer(IbvPd ibvPd, int length) throws IOException {
4657
block = unsafeAlloc.allocate((long)length);
@@ -126,25 +137,10 @@ private void unregister() {
126137
}
127138

128139
ByteBuffer getByteBuffer() throws IOException {
129-
Class<?> classDirectByteBuffer;
130-
try {
131-
classDirectByteBuffer = Class.forName("java.nio.DirectByteBuffer");
132-
} catch (ClassNotFoundException e) {
133-
throw new IOException("java.nio.DirectByteBuffer class not found");
134-
}
135-
Constructor<?> constructor;
136-
try {
137-
constructor = classDirectByteBuffer.getDeclaredConstructor(long.class, int.class);
138-
} catch (NoSuchMethodException e) {
139-
throw new IOException("java.nio.DirectByteBuffer constructor not found");
140-
}
141-
constructor.setAccessible(true);
142-
ByteBuffer byteBuffer;
143140
try {
144-
byteBuffer = (ByteBuffer)constructor.newInstance(getAddress(), getLength());
141+
return (ByteBuffer)directBufferConstructor.newInstance(getAddress(), getLength());
145142
} catch (Exception e) {
146143
throw new IOException("java.nio.DirectByteBuffer exception: " + e.toString());
147144
}
148-
return byteBuffer;
149145
}
150146
}

src/main/java/org/apache/spark/shuffle/rdma/RdmaBufferManager.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@
2525
import java.util.concurrent.atomic.AtomicInteger;
2626
import java.util.concurrent.atomic.AtomicLong;
2727

28-
import com.ibm.disni.rdma.verbs.IbvMr;
2928
import com.ibm.disni.rdma.verbs.IbvPd;
30-
import com.ibm.disni.rdma.verbs.SVCRegMr;
3129
import org.slf4j.Logger;
3230
import org.slf4j.LoggerFactory;
3331
import scala.concurrent.ExecutionContext;
@@ -99,7 +97,7 @@ private void close() {
9997
private final ConcurrentHashMap<Integer, AllocatorStack> allocStackMap =
10098
new ConcurrentHashMap<>();
10199
private IbvPd pd;
102-
private IbvMr odpMr = null;
100+
private final boolean useOdp;
103101
private long maxCacheSize;
104102
private static final ExecutionContextExecutor globalScalaExecutor =
105103
ExecutionContext.Implicits$.MODULE$.global();
@@ -110,15 +108,12 @@ private void close() {
110108
this.minimumAllocationSize = Math.min(conf.recvWrSize(), MIN_BLOCK_SIZE);
111109
this.maxCacheSize = conf.maxBufferAllocationSize();
112110
if (conf.useOdp(pd.getContext())) {
113-
int access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE |
114-
IbvMr.IBV_ACCESS_REMOTE_READ | IbvMr.IBV_ACCESS_ON_DEMAND;
115-
116-
SVCRegMr sMr = pd.regMr(0, -1, access).execute();
117-
this.odpMr = sMr.getMr();
111+
useOdp = true;
118112
if (conf.collectOdpStats()) {
119113
odpStats = new OdpStats(conf);
120114
}
121-
sMr.free();
115+
} else {
116+
useOdp = false;
122117
}
123118
}
124119

@@ -217,9 +212,9 @@ private void cleanLRUStacks(long idleBuffersSize) {
217212

218213
IbvPd getPd() { return this.pd; }
219214

220-
IbvMr getOdpMr() { return this.odpMr; }
215+
boolean useOdp() { return this.useOdp; }
221216

222-
void stop() throws IOException {
217+
void stop() {
223218
logger.info("Rdma buffers allocation statistics:");
224219
for (Integer size : allocStackMap.keySet()) {
225220
AllocatorStack allocatorStack = allocStackMap.remove(size);
@@ -230,11 +225,8 @@ void stop() throws IOException {
230225
}
231226
}
232227

233-
if (odpMr != null) {
234-
odpMr.deregMr().execute().free();
235-
if (odpStats != null) {
236-
odpStats.printODPStatistics();
237-
}
228+
if (useOdp && odpStats != null) {
229+
odpStats.printODPStatistics();
238230
}
239231
}
240232
}

src/main/java/org/apache/spark/shuffle/rdma/RdmaChannel.java

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class RdmaChannel {
4343
private final ConcurrentHashMap<Integer, ConcurrentLinkedDeque<SVCPostSend>> svcPostSendCache =
4444
new ConcurrentHashMap();
4545

46-
enum RdmaChannelType { RPC_REQUESTOR, RPC_RESPONDER, RDMA_READ_REQUESTOR, RDMA_READ_RESPONDER }
46+
enum RdmaChannelType { RPC, RDMA_READ_REQUESTOR, RDMA_READ_RESPONDER }
4747
private final RdmaChannelType rdmaChannelType;
4848

4949
private final RdmaCompletionListener receiveListener;
@@ -130,6 +130,7 @@ private class CompletionInfo {
130130
// NOOP_RESERVED_INDEX is used for send operations that do not require a callback
131131
private static final int NOOP_RESERVED_INDEX = 0;
132132
private final AtomicInteger completionInfoIndex = new AtomicInteger(NOOP_RESERVED_INDEX);
133+
private final RdmaShuffleConf conf;
133134

134135
RdmaChannel(
135136
RdmaChannelType rdmaChannelType,
@@ -152,32 +153,20 @@ private class CompletionInfo {
152153
this.receiveListener = receiveListener;
153154
this.rdmaBufferManager = rdmaBufferManager;
154155
this.cpuVector = cpuVector;
156+
this.conf = conf;
155157

156158
switch (rdmaChannelType) {
157-
case RPC_REQUESTOR:
158-
// Requires full-size sends, and receives for credit reports only
159+
case RPC:
160+
// Single bidirectional QP between executors and driver.
159161
if (conf.swFlowControl()) {
160-
this.recvDepth = RECV_CREDIT_REPORT_RATIO;
161-
this.remoteRecvCredits = new Semaphore(conf.recvQueueDepth(), false);
162-
} else {
163-
this.recvDepth = 0;
162+
this.remoteRecvCredits = new Semaphore(
163+
conf.recvQueueDepth() - RECV_CREDIT_REPORT_RATIO, false);
164164
}
165-
this.recvWrSize = 0;
166-
this.sendDepth = conf.sendQueueDepth();
167-
this.sendBudgetSemaphore = new Semaphore(sendDepth, false);
168-
break;
169-
170-
case RPC_RESPONDER:
171-
// Requires full-size receives and sends for credit reports only
172165
this.recvDepth = conf.recvQueueDepth();
173166
this.recvWrSize = conf.recvWrSize();
174-
if (conf.swFlowControl()) {
175-
this.sendDepth = RECV_CREDIT_REPORT_RATIO;
176-
} else {
177-
this.sendDepth = 0;
178-
}
167+
this.sendDepth = conf.sendQueueDepth();
168+
this.sendBudgetSemaphore = new Semaphore(sendDepth - RECV_CREDIT_REPORT_RATIO, false);
179169
break;
180-
181170
case RDMA_READ_REQUESTOR:
182171
// Requires sends only, no need for any receives
183172
this.recvDepth = 0;
@@ -322,6 +311,10 @@ void connect(InetSocketAddress socketAddress) throws IOException {
322311
setRdmaChannelState(RdmaChannelState.CONNECTED);
323312
}
324313

314+
InetSocketAddress getSourceSocketAddress() throws IOException {
315+
return (InetSocketAddress)cmId.getSource();
316+
}
317+
325318
void accept() throws IOException {
326319
RdmaConnParam connParams = new RdmaConnParam();
327320

@@ -778,7 +771,7 @@ private void exhaustCq() throws IOException {
778771
}
779772
}
780773

781-
if (sendDepth == RECV_CREDIT_REPORT_RATIO) {
774+
if (conf.swFlowControl() && rdmaChannelType == RdmaChannelType.RPC) {
782775
// Software-level flow control is enabled
783776
localRecvCreditsPendingReport += reclaimedRecvWrs;
784777
if (localRecvCreditsPendingReport > (recvDepth / RECV_CREDIT_REPORT_RATIO)) {
@@ -895,7 +888,7 @@ void stop() throws InterruptedException, IOException {
895888
int ret = cmId.disconnect();
896889
if (ret != 0) {
897890
logger.error("disconnect failed with errno: " + ret);
898-
} else if (rdmaChannelType.equals(RdmaChannelType.RPC_REQUESTOR) ||
891+
} else if (rdmaChannelType.equals(RdmaChannelType.RPC) ||
899892
rdmaChannelType.equals(RdmaChannelType.RDMA_READ_REQUESTOR)) {
900893
try {
901894
processRdmaCmEvent(RdmaCmEvent.EventType.RDMA_CM_EVENT_DISCONNECTED.ordinal(),

src/main/java/org/apache/spark/shuffle/rdma/RdmaMappedFile.java

100644100755
Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ public class RdmaMappedFile {
4040
private FileChannel fileChannel;
4141

4242
private final IbvPd ibvPd;
43-
private IbvMr odpMr;
4443

4544
private final RdmaMapTaskOutput rdmaMapTaskOutput;
45+
private final RdmaBufferManager rdmaBufferManager;
4646

4747
public RdmaMapTaskOutput getRdmaMapTaskOutput() { return rdmaMapTaskOutput; }
4848

@@ -79,8 +79,7 @@ public RdmaMappedFile(File file, int chunkSize, long[] partitionLengths,
7979
IllegalAccessException {
8080
this.file = file;
8181
this.ibvPd = rdmaBufferManager.getPd();
82-
this.odpMr = rdmaBufferManager.getOdpMr();
83-
82+
this.rdmaBufferManager = rdmaBufferManager;
8483
final RandomAccessFile backingFile = new RandomAccessFile(file, "rw");
8584
this.fileChannel = backingFile.getChannel();
8685

@@ -136,7 +135,7 @@ private void mapAndRegister(int chunkSize, long[] partitionLengths) throws IOExc
136135
curPartition,
137136
rdmaFileMapping.address + curLength - partitionLengths[curPartition],
138137
(int)partitionLengths[curPartition],
139-
(rdmaFileMapping.ibvMr != null) ? rdmaFileMapping.ibvMr.getLkey() : odpMr.getLkey());
138+
rdmaFileMapping.ibvMr.getLkey());
140139
curPartition++;
141140
}
142141
}
@@ -157,15 +156,15 @@ private void mapAndRegister(long fileOffset, long length) throws IOException,
157156
}
158157

159158
IbvMr ibvMr = null;
160-
if (odpMr == null) {
159+
if (!rdmaBufferManager.useOdp()) {
161160
SVCRegMr svcRegMr = ibvPd.regMr(address, (int)length, ACCESS).execute();
162161
ibvMr = svcRegMr.getMr();
163162
svcRegMr.free();
164163
} else {
165-
int ret = odpMr.expPrefetchMr(address, (int)length);
166-
if (ret != 0) {
167-
throw new IOException("expPrefetchMr failed with: " + ret);
168-
}
164+
SVCRegMr svcRegMr = ibvPd.regMr(address, (int)length,
165+
ACCESS | IbvMr.IBV_ACCESS_ON_DEMAND).execute();
166+
ibvMr = svcRegMr.getMr();
167+
svcRegMr.free();
169168
}
170169

171170
rdmaFileMappings.add(new RdmaFileMapping(ibvMr, address, mapAddress, length, alignedLength));
@@ -201,30 +200,14 @@ public void dispose() throws IOException, InvocationTargetException, IllegalAcce
201200
}
202201

203202
private ByteBuffer getByteBuffer(long address, int length) throws IOException {
204-
Class<?> classDirectByteBuffer;
205203
try {
206-
classDirectByteBuffer = Class.forName("java.nio.DirectByteBuffer");
207-
} catch (ClassNotFoundException e) {
208-
throw new IOException("java.nio.DirectByteBuffer class not found");
209-
}
210-
Constructor<?> constructor;
211-
try {
212-
constructor = classDirectByteBuffer.getDeclaredConstructor(long.class, int.class);
213-
} catch (NoSuchMethodException e) {
214-
throw new IOException("java.nio.DirectByteBuffer constructor not found");
215-
}
216-
constructor.setAccessible(true);
217-
ByteBuffer byteBuffer;
218-
try {
219-
byteBuffer = (ByteBuffer)constructor.newInstance(address, length);
204+
return (ByteBuffer)RdmaBuffer.directBufferConstructor.newInstance(address, length);
220205
} catch (InvocationTargetException ex) {
221206
throw new IOException("java.nio.DirectByteBuffer: " +
222207
"InvocationTargetException: " + ex.getTargetException());
223208
} catch (Exception e) {
224209
throw new IOException("java.nio.DirectByteBuffer exception: " + e.toString());
225210
}
226-
227-
return byteBuffer;
228211
}
229212

230213
public ByteBuffer getByteBufferForPartition(int partitionId) throws IOException {

0 commit comments

Comments
 (0)