Skip to content

Commit 2586e0c

Browse files
committed
fix issue #41
introduce isResourceAllocated() and support reconnect.
1 parent aa27636 commit 2586e0c

File tree

2 files changed

+60
-51
lines changed

2 files changed

+60
-51
lines changed

src/main/java/com/ibm/disni/RdmaActiveEndpointGroup.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public IbvQP createQpProvider(C endpoint) throws IOException{
100100

101101
public void allocateResources(C endpoint) throws Exception {
102102
endpoint.allocateResources();
103-
}
103+
}
104104

105105
public void close() throws IOException, InterruptedException {
106106
super.close();
@@ -110,9 +110,11 @@ public void close() throws IOException, InterruptedException {
110110
}
111111

112112
void close(RdmaEndpoint endpoint) throws IOException {
113-
IbvContext context = endpoint.getIdPriv().getVerbs();
114-
RdmaActiveCqProcessor<C> cqProcessor = cqMap.get(context.getCmd_fd());
115-
cqProcessor.unregister(endpoint);
113+
if (endpoint.isResouceAllocated()) {
114+
IbvContext context = endpoint.getIdPriv().getVerbs();
115+
RdmaActiveCqProcessor<C> cqProcessor = cqMap.get(context.getCmd_fd());
116+
cqProcessor.unregister(endpoint);
117+
}
116118
}
117119

118120
public int getMaxWR() {

src/main/java/com/ibm/disni/RdmaEndpoint.java

Lines changed: 54 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,15 @@ public class RdmaEndpoint {
6868
private boolean isClosed;
6969
private boolean isInitialized;
7070
private boolean serverSide;
71-
71+
private boolean isError;
72+
7273
protected RdmaEndpoint(RdmaEndpointGroup<? extends RdmaEndpoint> group, RdmaCmId idPriv, boolean serverSide) throws IOException{
7374
this.endpointId = group.getNextId();
7475
this.group = group;
7576
this.idPriv = idPriv;
76-
this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ;
77-
77+
this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ;
78+
this.isError = false;
79+
7880
this.qp = null;
7981
this.pd = null;
8082
this.cqProcessor = null;
@@ -84,7 +86,7 @@ protected RdmaEndpoint(RdmaEndpointGroup<? extends RdmaEndpoint> group, RdmaCmId
8486
this.serverSide = serverSide;
8587
logger.info("new client endpoint, id " + endpointId + ", idPriv " + idPriv.getPs());
8688
}
87-
89+
8890
/**
8991
/**
9092
/**
@@ -94,41 +96,44 @@ protected RdmaEndpoint(RdmaEndpointGroup<? extends RdmaEndpoint> group, RdmaCmId
9496
* @param timeout connection timeout
9597
*/
9698
public synchronized void connect(SocketAddress dst, int timeout) throws Exception {
97-
if (connState != CONN_STATE_INITIALIZED) {
99+
if (connState != CONN_STATE_INITIALIZED && !isError) {
98100
throw new IOException("endpoint already connected");
99101
}
100-
idPriv.resolveAddr(null, dst, timeout);
101-
while(connState < CONN_STATE_ADDR_RESOLVED){
102-
wait();
102+
isError = false;
103+
if (connState < CONN_STATE_ADDR_RESOLVED) {
104+
idPriv.resolveAddr(null, dst, timeout);
105+
while (connState < CONN_STATE_ADDR_RESOLVED && !isError) {
106+
wait();
107+
}
108+
if (isError)
109+
throw new IOException("resolve address failed");
103110
}
104-
if (connState != CONN_STATE_ADDR_RESOLVED){
105-
throw new IOException("resolve address failed");
111+
if (connState < CONN_STATE_ROUTE_RESOLVED) {
112+
idPriv.resolveRoute(timeout);
113+
while (connState < CONN_STATE_ROUTE_RESOLVED && !isError) {
114+
wait();
115+
}
116+
if (isError)
117+
throw new IOException("resolve route failed");
106118
}
107-
108-
idPriv.resolveRoute(timeout);
109-
while(connState < CONN_STATE_ROUTE_RESOLVED){
110-
wait();
119+
if (connState < CONN_STATE_RESOURCES_ALLOCATED) {
120+
group.allocateResourcesRaw(this);
121+
while (connState < CONN_STATE_RESOURCES_ALLOCATED && !isError) {
122+
wait();
123+
}
124+
if (isError)
125+
throw new IOException("allocate resource failed");
111126
}
112-
if (connState != CONN_STATE_ROUTE_RESOLVED){
113-
throw new IOException("resolve route failed");
114-
}
115-
116-
group.allocateResourcesRaw(this);
117-
while(connState < CONN_STATE_RESOURCES_ALLOCATED){
118-
wait();
119-
}
120-
if (connState != CONN_STATE_RESOURCES_ALLOCATED){
121-
throw new IOException("resolve route failed");
122-
}
123-
124127
RdmaConnParam connParam = getConnParam();
125128
idPriv.connect(connParam);
126-
127-
while(connState < CONN_STATE_CONNECTED){
129+
130+
while(connState < CONN_STATE_CONNECTED && !isError){
128131
wait();
129-
}
130-
}
131-
132+
}
133+
if (isError)
134+
throw new IOException("idPriv connect failed");
135+
}
136+
132137
/* (non-Javadoc)
133138
* @see com.ibm.jverbs.endpoints.ICmConsumer#dispatchCmEvent(com.ibm.jverbs.cm.RdmaCmEvent)
134139
*/
@@ -138,28 +143,27 @@ public synchronized void dispatchCmEvent(RdmaCmEvent cmEvent)
138143
int eventType = cmEvent.getEvent();
139144
if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_ADDR_RESOLVED.ordinal()) {
140145
connState = RdmaEndpoint.CONN_STATE_ADDR_RESOLVED;
141-
notifyAll();
142146
} else if (cmEvent.getEvent() == RdmaCmEvent.EventType.RDMA_CM_EVENT_ROUTE_RESOLVED.ordinal()) {
143147
connState = RdmaEndpoint.CONN_STATE_ROUTE_RESOLVED;
144-
notifyAll();
145148
} else if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_ESTABLISHED.ordinal()) {
146149
logger.info("got event type + RDMA_CM_EVENT_ESTABLISHED, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr());
147150
connState = CONN_STATE_CONNECTED;
148-
notifyAll();
149151
} else if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_DISCONNECTED.ordinal()) {
150152
logger.info("got event type + RDMA_CM_EVENT_DISCONNECTED, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr());
151153
connState = CONN_STATE_CLOSED;
152-
notifyAll();
153154
} else if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_CONNECT_REQUEST.ordinal()) {
154155
logger.info("got event type + RDMA_CM_EVENT_CONNECT_REQUEST, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr());
155156
} else {
157+
System.err.println("got other event: " + eventType);
156158
logger.info("got event type + UNKNOWN, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr());
159+
isError = true;
157160
}
161+
notifyAll();
158162
} catch (Exception e) {
159163
throw new IOException(e);
160164
}
161165
}
162-
166+
163167
public final synchronized void allocateResources() throws IOException {
164168
if (!isInitialized) {
165169
this.pd = group.createProtectionDomainRaw(this);
@@ -176,31 +180,31 @@ synchronized void accept() throws Exception {
176180
group.allocateResourcesRaw(this);
177181
while(connState < CONN_STATE_RESOURCES_ALLOCATED){
178182
wait();
179-
}
183+
}
180184
if (connState != CONN_STATE_RESOURCES_ALLOCATED){
181185
throw new IOException("resolve route failed");
182-
}
183-
186+
}
187+
184188
RdmaConnParam connParam = getConnParam();
185-
idPriv.accept(connParam);
189+
idPriv.accept(connParam);
186190
while(connState < CONN_STATE_CONNECTED){
187191
wait();
188-
}
192+
}
189193
}
190194

191195
/**
192-
* Close this endpoint.
193-
*
194-
* This closes the connection and free's all the resources, e.g., queue pair.
195-
* @throws InterruptedException
196+
* Close this endpoint.
197+
*
198+
* This closes the connection and free's all the resources, e.g., queue pair.
199+
* @throws InterruptedException
196200
*
197201
* @throws Exception the exception
198202
*/
199203
public synchronized void close() throws IOException, InterruptedException {
200204
if (isClosed){
201205
return;
202206
}
203-
207+
204208
logger.info("closing client endpoint");
205209
if (connState == CONN_STATE_CONNECTED) {
206210
idPriv.disconnect();
@@ -214,7 +218,7 @@ public synchronized void close() throws IOException, InterruptedException {
214218
isClosed = true;
215219
logger.info("closing client done");
216220
}
217-
221+
218222
/**
219223
* Checks if the endpoint is connected.
220224
*
@@ -223,6 +227,9 @@ public synchronized void close() throws IOException, InterruptedException {
223227
public synchronized boolean isConnected() {
224228
return (connState == CONN_STATE_CONNECTED);
225229
}
230+
public synchronized boolean isResouceAllocated() {
231+
return (connState == CONN_STATE_RESOURCES_ALLOCATED);
232+
}
226233

227234
/**
228235
* Checks if the endpoint is closed.

0 commit comments

Comments
 (0)