Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/main/java/io/socket/IOAcknowledgeWithTimeout.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.socket;

/**
* The interface acknowledge that supports timeout
* @author pjf
*
*/
public interface IOAcknowledgeWithTimeout extends IOAcknowledge {

/**
* Returns the desired timeout for the message
* @return timeout in ms
*/
long getTimeout();

/**
* Called when the acknowledgement does not arrive in time
*/
void timeout();

}
9 changes: 7 additions & 2 deletions src/main/java/io/socket/IOCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,13 @@ public interface IOCallback {
* On connect. Called when the socket becomes ready so it is now able to receive data
*/
void onConnect();

/**

/**
* On reconnect. Called when the socket successfully reconnects
*/
void onReconnect();

/**
* On message. Called when the server sends String data.
*
* @param data the data.
Expand Down
98 changes: 78 additions & 20 deletions src/main/java/io/socket/IOConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@
*/
package io.socket;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;

import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
Expand All @@ -29,13 +36,6 @@
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;

import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.JsonParser;
import com.google.gson.JsonPrimitive;

/**
* The Class IOConnection.
*/
Expand Down Expand Up @@ -129,7 +129,7 @@ class IOConnection implements IOCallback {
private int nextId = 1;

/** Acknowledges. */
HashMap<Integer, IOAcknowledge> acknowledge = new HashMap<Integer, IOAcknowledge>();
HashMap<Integer, AckEntry> acknowledge = new HashMap<Integer, AckEntry>();

/** true if there's already a keepalive in {@link #outputBuffer}. */
private boolean keepAliveInQueue;
Expand Down Expand Up @@ -382,6 +382,39 @@ public void ack(JsonElement... args) {
};
}

private static class AckEntry {
AckTimeoutTask ackTimeoutTask;
IOAcknowledge ack;
}

/**
* The Class AckTimeoutTask. Handles timeouts on ACK
*/
private class AckTimeoutTask extends TimerTask {

int ackId;
IOAcknowledgeWithTimeout ack;

AckTimeoutTask(int ackId, IOAcknowledgeWithTimeout ack) {
this.ackId = ackId;
this.ack = ack;
}

/*
* (non-Javadoc)
*
* @see java.util.TimerTask#run()
*/
@Override
public void run() {
synchronized (acknowledge) {
acknowledge.remove(ackId);
}
logger.warning("! ack " + ackId + " timeout");
ack.timeout();
}
}

/**
* adds an {@link IOAcknowledge} to an {@link IOMessage}.
*
Expand All @@ -393,8 +426,20 @@ public void ack(JsonElement... args) {
private void synthesizeAck(IOMessage message, IOAcknowledge ack) {
if (ack != null) {
int id = nextId++;
acknowledge.put(id, ack);
AckEntry entry = new AckEntry();
entry.ack = ack;
if (ack instanceof IOAcknowledgeWithTimeout) {
IOAcknowledgeWithTimeout timeoutAck = (IOAcknowledgeWithTimeout) ack;
entry.ackTimeoutTask = new AckTimeoutTask(id, timeoutAck);
}
message.setId(id + "+");
synchronized (acknowledge) {
acknowledge.put(id, entry);
}
if (entry.ackTimeoutTask != null) {
long timeout = entry.ackTimeoutTask.ack.getTimeout();
backgroundTimer.schedule(entry.ackTimeoutTask, timeout);
}
}
}

Expand Down Expand Up @@ -525,6 +570,7 @@ public synchronized void transportConnected() {
if (reconnectTask != null) {
reconnectTask.cancel();
reconnectTask = null;
onReconnect();
}
resetTimeout();
this.keepAliveInQueue = false;
Expand Down Expand Up @@ -696,17 +742,25 @@ public void transportMessage(String text) {
if (data.length == 2) {
try {
int id = Integer.parseInt(data[0]);
IOAcknowledge ack = acknowledge.get(id);
if (ack == null)
logger.warning("Received unknown ack packet");
else {
AckEntry ackEntry = null;
synchronized (acknowledge) {
ackEntry = acknowledge.get(id);
if (ackEntry != null) {
ackEntry.ackTimeoutTask.cancel();
acknowledge.remove(id);
}
}
if (ackEntry != null) {
JsonArray array = new JsonParser().parse(data[1]).getAsJsonArray();
JsonElement[] args = new JsonElement[array.size()];
for (int i = 0; i < args.length; i++) {
args[i] = array.get(i);
}
ack.ack(args);
ackEntry.ack.ack(args);
}
else
logger.warning("Received unknown or timeouted ack packet");

} catch (NumberFormatException e) {
logger.warning("Received malformated Acknowledge! This is potentially filling up the acknowledges!");
} catch (JsonParseException e) {
Expand Down Expand Up @@ -903,16 +957,20 @@ public IOTransport getTransport() {

@Override
public void onDisconnect() {
SocketIO socket = sockets.get("");
if (socket != null)
socket.getCallback().onDisconnect();
for (SocketIO socket : sockets.values())
socket.getCallback().onDisconnect();
}

@Override
public void onConnect() {
SocketIO socket = sockets.get("");
if (socket != null)
socket.getCallback().onConnect();
for (SocketIO socket : sockets.values())
socket.getCallback().onConnect();
}

@Override
public void onReconnect() {
for (SocketIO socket : sockets.values())
socket.getCallback().onReconnect();
}

@Override
Expand Down
8 changes: 7 additions & 1 deletion src/test/java/io/socket/AbstractTestSocketIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,13 @@ public void onConnect() {
events.add("onConnect");
}

/*
@Override
public void onReconnect() {
System.out.println("onReconnect");
events.add("onReconnect");
}

/*
* (non-Javadoc)
*
* @see io.socket.IOCallback#onMessage(java.lang.String,
Expand Down