Skip to content

Commit 16e9aaa

Browse files
committed
fix issue#41
introduced isResourceAllocated and isError. Support reconnect.
1 parent cfcee63 commit 16e9aaa

File tree

2 files changed

+49
-41
lines changed

2 files changed

+49
-41
lines changed

src/main/java/com/ibm/disni/RdmaActiveEndpointGroup.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public IbvQP createQpProvider(C endpoint) throws IOException{
100100

101101
public void allocateResources(C endpoint) throws Exception {
102102
endpoint.allocateResources();
103-
}
103+
}
104104

105105
public void close() throws IOException, InterruptedException {
106106
super.close();
@@ -110,9 +110,11 @@ public void close() throws IOException, InterruptedException {
110110
}
111111

112112
void close(RdmaEndpoint endpoint) throws IOException {
113-
IbvContext context = endpoint.getIdPriv().getVerbs();
114-
RdmaActiveCqProcessor<C> cqProcessor = cqMap.get(context.getCmd_fd());
115-
cqProcessor.unregister(endpoint);
113+
if (endpoint.isResouceAllocated()) {
114+
IbvContext context = endpoint.getIdPriv().getVerbs();
115+
RdmaActiveCqProcessor<C> cqProcessor = cqMap.get(context.getCmd_fd());
116+
cqProcessor.unregister(endpoint);
117+
}
116118
}
117119

118120
public int getMaxWR() {

src/main/java/com/ibm/disni/RdmaEndpoint.java

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,15 @@ public class RdmaEndpoint {
6868
private boolean isClosed;
6969
private boolean isInitialized;
7070
private boolean serverSide;
71-
71+
private boolean isError;
72+
7273
protected RdmaEndpoint(RdmaEndpointGroup<? extends RdmaEndpoint> group, RdmaCmId idPriv, boolean serverSide) throws IOException{
7374
this.endpointId = group.getNextId();
7475
this.group = group;
7576
this.idPriv = idPriv;
76-
this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ;
77-
77+
this.access = IbvMr.IBV_ACCESS_LOCAL_WRITE | IbvMr.IBV_ACCESS_REMOTE_WRITE | IbvMr.IBV_ACCESS_REMOTE_READ;
78+
this.isError = false;
79+
7880
this.qp = null;
7981
this.pd = null;
8082
this.cqProcessor = null;
@@ -84,7 +86,7 @@ protected RdmaEndpoint(RdmaEndpointGroup<? extends RdmaEndpoint> group, RdmaCmId
8486
this.serverSide = serverSide;
8587
logger.info("new client endpoint, id " + endpointId + ", idPriv " + idPriv.getPs());
8688
}
87-
89+
8890
/**
8991
/**
9092
/**
@@ -94,41 +96,44 @@ protected RdmaEndpoint(RdmaEndpointGroup<? extends RdmaEndpoint> group, RdmaCmId
9496
* @param timeout connection timeout
9597
*/
9698
public synchronized void connect(SocketAddress dst, int timeout) throws Exception {
97-
if (connState != CONN_STATE_INITIALIZED) {
99+
if (connState != CONN_STATE_INITIALIZED && !isError) {
98100
throw new IOException("endpoint already connected");
99101
}
100-
idPriv.resolveAddr(null, dst, timeout);
101-
while(connState < CONN_STATE_ADDR_RESOLVED){
102-
wait();
102+
isError = false;
103+
if (connState < CONN_STATE_ADDR_RESOLVED) {
104+
idPriv.resolveAddr(null, dst, timeout);
105+
while (connState < CONN_STATE_ADDR_RESOLVED && !isError) {
106+
wait();
107+
}
108+
if (isError)
109+
throw new IOException("resolve address failed");
103110
}
104-
if (connState != CONN_STATE_ADDR_RESOLVED){
105-
throw new IOException("resolve address failed");
111+
if (connState < CONN_STATE_ROUTE_RESOLVED) {
112+
idPriv.resolveRoute(timeout);
113+
while (connState < CONN_STATE_ROUTE_RESOLVED && !isError) {
114+
wait();
115+
}
116+
if (isError)
117+
throw new IOException("resolve route failed");
106118
}
107-
108-
idPriv.resolveRoute(timeout);
109-
while(connState < CONN_STATE_ROUTE_RESOLVED){
110-
wait();
119+
if (connState < CONN_STATE_RESOURCES_ALLOCATED) {
120+
group.allocateResourcesRaw(this);
121+
while (connState < CONN_STATE_RESOURCES_ALLOCATED && !isError) {
122+
wait();
123+
}
124+
if (isError)
125+
throw new IOException("allocate resource failed");
111126
}
112-
if (connState != CONN_STATE_ROUTE_RESOLVED){
113-
throw new IOException("resolve route failed");
114-
}
115-
116-
group.allocateResourcesRaw(this);
117-
while(connState < CONN_STATE_RESOURCES_ALLOCATED){
118-
wait();
119-
}
120-
if (connState != CONN_STATE_RESOURCES_ALLOCATED){
121-
throw new IOException("resources allocation failed");
122-
}
123-
124127
RdmaConnParam connParam = getConnParam();
125128
idPriv.connect(connParam);
126-
127-
while(connState < CONN_STATE_CONNECTED){
129+
130+
while(connState < CONN_STATE_CONNECTED && !isError){
128131
wait();
129-
}
130-
}
131-
132+
}
133+
if (isError)
134+
throw new IOException("idPriv connect failed");
135+
}
136+
132137
/* (non-Javadoc)
133138
* @see com.ibm.jverbs.endpoints.ICmConsumer#dispatchCmEvent(com.ibm.jverbs.cm.RdmaCmEvent)
134139
*/
@@ -138,28 +143,26 @@ public synchronized void dispatchCmEvent(RdmaCmEvent cmEvent)
138143
int eventType = cmEvent.getEvent();
139144
if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_ADDR_RESOLVED.ordinal()) {
140145
connState = RdmaEndpoint.CONN_STATE_ADDR_RESOLVED;
141-
notifyAll();
142146
} else if (cmEvent.getEvent() == RdmaCmEvent.EventType.RDMA_CM_EVENT_ROUTE_RESOLVED.ordinal()) {
143147
connState = RdmaEndpoint.CONN_STATE_ROUTE_RESOLVED;
144-
notifyAll();
145148
} else if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_ESTABLISHED.ordinal()) {
146149
logger.info("got event type + RDMA_CM_EVENT_ESTABLISHED, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr());
147150
connState = CONN_STATE_CONNECTED;
148-
notifyAll();
149151
} else if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_DISCONNECTED.ordinal()) {
150152
logger.info("got event type + RDMA_CM_EVENT_DISCONNECTED, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr());
151153
connState = CONN_STATE_CLOSED;
152-
notifyAll();
153154
} else if (eventType == RdmaCmEvent.EventType.RDMA_CM_EVENT_CONNECT_REQUEST.ordinal()) {
154155
logger.info("got event type + RDMA_CM_EVENT_CONNECT_REQUEST, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr());
155156
} else {
156157
logger.info("got event type + UNKNOWN, srcAddress " + this.getSrcAddr() + ", dstAddress " + this.getDstAddr());
158+
isError = true;
157159
}
160+
notifyAll();
158161
} catch (Exception e) {
159162
throw new IOException(e);
160163
}
161164
}
162-
165+
163166
public final synchronized void allocateResources() throws IOException {
164167
if (!isInitialized) {
165168
this.pd = group.createProtectionDomainRaw(this);
@@ -200,7 +203,7 @@ public synchronized void close() throws IOException, InterruptedException {
200203
if (isClosed){
201204
return;
202205
}
203-
206+
204207
logger.info("closing client endpoint");
205208
if (connState == CONN_STATE_CONNECTED) {
206209
idPriv.disconnect();
@@ -223,6 +226,9 @@ public synchronized void close() throws IOException, InterruptedException {
223226
public synchronized boolean isConnected() {
224227
return (connState == CONN_STATE_CONNECTED);
225228
}
229+
public synchronized boolean isResouceAllocated() {
230+
return (connState == CONN_STATE_RESOURCES_ALLOCATED);
231+
}
226232

227233
/**
228234
* Checks if the endpoint is closed.

0 commit comments

Comments
 (0)