Skip to content
This repository was archived by the owner on Dec 20, 2022. It is now read-only.

Commit e93b525

Browse files
author
Yuval Degani
committed
Initial commit
0 parents  commit e93b525

31 files changed

+4874
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
target/
2+
*.iml

AUTHORS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Yuval Degani <[email protected]>

LICENSE

Lines changed: 299 additions & 0 deletions
Large diffs are not rendered by default.

README.md

Whitespace-only changes.

pom.xml

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
5+
http://maven.apache.org/maven-v4_0_0.xsd">
6+
<modelVersion>4.0.0</modelVersion>
7+
8+
<groupId>com.mellanox.rdma</groupId>
9+
<artifactId>spark-rdma</artifactId>
10+
<version>1.0</version>
11+
<name>SparkRDMA Shuffle Manager Plugin</name>
12+
<packaging>jar</packaging>
13+
14+
<licenses>
15+
<license>
16+
<name>Apache 2.0 License</name>
17+
<url>http://www.apache.org/licenses/LICENSE-2.0.html</url>
18+
<distribution>repo</distribution>
19+
</license>
20+
</licenses>
21+
22+
<developers>
23+
<developer>
24+
<name>Yuval Degani</name>
25+
<email>[email protected]</email>
26+
</developer>
27+
</developers>
28+
29+
<properties>
30+
<maven.compiler.source>1.8</maven.compiler.source>
31+
<maven.compiler.target>1.8</maven.compiler.target>
32+
<encoding>UTF-8</encoding>
33+
<scala.version>2.11.8</scala.version>
34+
<scala.compat.version>2.11</scala.compat.version>
35+
<spark.version>2.0.0</spark.version>
36+
</properties>
37+
38+
<profiles>
39+
<profile>
40+
<id>spark-2.0.0</id>
41+
<properties>
42+
<spark.version>2.0.0</spark.version>
43+
</properties>
44+
</profile>
45+
</profiles>
46+
47+
<dependencies>
48+
<dependency>
49+
<groupId>org.apache.spark</groupId>
50+
<artifactId>spark-core_2.11</artifactId>
51+
<version>${spark.version}</version>
52+
<scope>provided</scope>
53+
</dependency>
54+
<dependency>
55+
<groupId>com.ibm.disni</groupId>
56+
<artifactId>disni</artifactId>
57+
<version>1.2</version>
58+
</dependency>
59+
</dependencies>
60+
61+
<build>
62+
<plugins>
63+
<plugin>
64+
<groupId>org.scala-tools</groupId>
65+
<artifactId>maven-scala-plugin</artifactId>
66+
<version>2.15.2</version>
67+
<executions>
68+
<execution>
69+
<id>compile</id>
70+
<goals>
71+
<goal>compile</goal>
72+
</goals>
73+
<phase>compile</phase>
74+
</execution>
75+
<execution>
76+
<phase>process-resources</phase>
77+
<goals>
78+
<goal>compile</goal>
79+
</goals>
80+
</execution>
81+
</executions>
82+
</plugin>
83+
<plugin>
84+
<artifactId>maven-assembly-plugin</artifactId>
85+
<version>3.0.0</version>
86+
<configuration>
87+
<descriptorRefs>
88+
<descriptorRef>jar-with-dependencies</descriptorRef>
89+
</descriptorRefs>
90+
</configuration>
91+
<executions>
92+
<execution>
93+
<id>make-assembly</id>
94+
<phase>package</phase>
95+
<goals>
96+
<goal>single</goal>
97+
</goals>
98+
</execution>
99+
</executions>
100+
</plugin>
101+
</plugins>
102+
</build>
103+
</project>
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.rdma;
19+
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.nio.ByteBuffer;
23+
24+
public class ByteBufferBackedInputStream extends InputStream {
25+
private final ByteBuffer buf;
26+
27+
public ByteBufferBackedInputStream(ByteBuffer buf) {
28+
this.buf = buf;
29+
}
30+
31+
public int read() throws IOException {
32+
if (!buf.hasRemaining()) {
33+
return -1;
34+
}
35+
return buf.get() & 0xFF;
36+
}
37+
38+
public int read(byte[] bytes, int off, int len) throws IOException {
39+
if (!buf.hasRemaining()) {
40+
return -1;
41+
}
42+
43+
len = Math.min(len, buf.remaining());
44+
buf.get(bytes, off, len);
45+
return len;
46+
}
47+
48+
public int available() {
49+
return buf.remaining();
50+
}
51+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.rdma;
19+
20+
import java.io.IOException;
21+
import java.io.OutputStream;
22+
import java.nio.ByteBuffer;
23+
24+
class ByteBufferBackedOutputStream extends OutputStream {
25+
private final ByteBuffer buf;
26+
27+
ByteBufferBackedOutputStream(ByteBuffer buf) {
28+
this.buf = buf;
29+
}
30+
31+
public void write(int b) throws IOException {
32+
buf.put((byte) b);
33+
}
34+
35+
public void write(byte[] bytes, int off, int len)
36+
throws IOException {
37+
buf.put(bytes, off, len);
38+
}
39+
}
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.rdma;
19+
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
import sun.misc.Unsafe;
23+
import com.ibm.disni.rdma.verbs.IbvPd;
24+
import com.ibm.disni.rdma.verbs.SVCRegMr;
25+
import com.ibm.disni.rdma.verbs.IbvMr;
26+
import sun.nio.ch.DirectBuffer;
27+
28+
import java.io.IOException;
29+
import java.lang.reflect.Constructor;
30+
import java.lang.reflect.Field;
31+
import java.nio.ByteBuffer;
32+
33+
public class RdmaBuffer {
34+
private static final Logger logger = LoggerFactory.getLogger(RdmaBuffer.class);
35+
private static final int BYTE_ARRAY_OFFSET;
36+
37+
private IbvMr ibvMr = null;
38+
private final long address;
39+
private final int length;
40+
41+
private static final Unsafe unsafe;
42+
static {
43+
try {
44+
Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
45+
unsafeField.setAccessible(true);
46+
unsafe = (Unsafe)unsafeField.get(null);
47+
} catch (IllegalAccessException | NoSuchFieldException e) {
48+
logger.error("Failed to retrieve the Unsafe");
49+
throw new RuntimeException(e);
50+
}
51+
52+
BYTE_ARRAY_OFFSET = unsafe.arrayBaseOffset(byte[].class);
53+
}
54+
55+
public RdmaBuffer(IbvPd ibvPd, int length) throws IOException {
56+
address = unsafe.allocateMemory((long)length);
57+
this.length = length;
58+
register(ibvPd);
59+
}
60+
61+
public RdmaBuffer(int length) {
62+
address = unsafe.allocateMemory((long)length);
63+
this.length = length;
64+
}
65+
66+
public long getAddress() {
67+
return address;
68+
}
69+
public int getLength() {
70+
return length;
71+
}
72+
public int getLkey() {
73+
return ibvMr.getLkey();
74+
}
75+
76+
public void free() {
77+
unregister();
78+
unsafe.freeMemory(address);
79+
}
80+
81+
private void register(IbvPd ibvPd) throws IOException {
82+
int access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE |
83+
IbvMr.IBV_ACCESS_REMOTE_READ;
84+
85+
SVCRegMr sMr = ibvPd.regMr(getAddress(), getLength(), access).execute();
86+
ibvMr = sMr.getMr();
87+
sMr.free();
88+
}
89+
90+
private void unregister() {
91+
if (ibvMr != null) {
92+
try {
93+
ibvMr.deregMr().execute().free();
94+
} catch (IOException e) {
95+
logger.warn("Deregister MR failed");
96+
}
97+
ibvMr = null;
98+
}
99+
}
100+
101+
public void write(DirectBuffer buf, long srcOffset, long dstOffset, long length) {
102+
unsafe.copyMemory(buf.address() + srcOffset, getAddress() + dstOffset, length);
103+
}
104+
105+
public void write(byte[] bytes, long srcOffset, long dstOffset, long length) {
106+
unsafe.copyMemory(
107+
bytes,
108+
BYTE_ARRAY_OFFSET + srcOffset,
109+
null,
110+
getAddress() + dstOffset,
111+
length);
112+
}
113+
114+
public ByteBuffer getByteBuffer() throws IOException {
115+
Class<?> classDirectByteBuffer;
116+
try {
117+
classDirectByteBuffer = Class.forName("java.nio.DirectByteBuffer");
118+
} catch (ClassNotFoundException e) {
119+
throw new IOException("java.nio.DirectByteBuffer class not found");
120+
}
121+
Constructor<?> constructor;
122+
try {
123+
constructor = classDirectByteBuffer.getDeclaredConstructor(long.class, int.class);
124+
} catch (NoSuchMethodException e) {
125+
throw new IOException("java.nio.DirectByteBuffer constructor not found");
126+
}
127+
constructor.setAccessible(true);
128+
ByteBuffer byteBuffer;
129+
try {
130+
byteBuffer = (ByteBuffer)constructor.newInstance(getAddress(), getLength());
131+
} catch (Exception e) {
132+
throw new IOException("java.nio.DirectByteBuffer exception: " + e.toString());
133+
}
134+
135+
return byteBuffer;
136+
}
137+
}

0 commit comments

Comments
 (0)