diff --git a/src/main/java/io/socket/IOAcknowledgeWithTimeout.java b/src/main/java/io/socket/IOAcknowledgeWithTimeout.java new file mode 100644 index 0000000..e47fdd7 --- /dev/null +++ b/src/main/java/io/socket/IOAcknowledgeWithTimeout.java @@ -0,0 +1,21 @@ +package io.socket; + +/** + * The interface acknowledge that supports timeout + * @author pjf + * + */ +public interface IOAcknowledgeWithTimeout extends IOAcknowledge { + + /** + * Returns the desired timeout for the message + * @return timeout in ms + */ + long getTimeout(); + + /** + * Called when the acknowledgement does not arrive in time + */ + void timeout(); + +} diff --git a/src/main/java/io/socket/IOCallback.java b/src/main/java/io/socket/IOCallback.java index 273d791..4b86691 100644 --- a/src/main/java/io/socket/IOCallback.java +++ b/src/main/java/io/socket/IOCallback.java @@ -24,8 +24,13 @@ public interface IOCallback { * On connect. Called when the socket becomes ready so it is now able to receive data */ void onConnect(); - - /** + + /** + * On reconnect. Called when the socket successfully reconnects + */ + void onReconnect(); + + /** * On message. Called when the server sends String data. * * @param data the data. diff --git a/src/main/java/io/socket/IOConnection.java b/src/main/java/io/socket/IOConnection.java index 324ab25..ddcb92e 100644 --- a/src/main/java/io/socket/IOConnection.java +++ b/src/main/java/io/socket/IOConnection.java @@ -8,6 +8,13 @@ */ package io.socket; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonParser; +import com.google.gson.JsonPrimitive; + import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; @@ -29,13 +36,6 @@ import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; -import com.google.gson.JsonParser; -import com.google.gson.JsonPrimitive; - /** * The Class IOConnection. */ @@ -129,7 +129,7 @@ class IOConnection implements IOCallback { private int nextId = 1; /** Acknowledges. */ - HashMap acknowledge = new HashMap(); + HashMap acknowledge = new HashMap(); /** true if there's already a keepalive in {@link #outputBuffer}. */ private boolean keepAliveInQueue; @@ -382,6 +382,39 @@ public void ack(JsonElement... args) { }; } + private static class AckEntry { + AckTimeoutTask ackTimeoutTask; + IOAcknowledge ack; + } + + /** + * The Class AckTimeoutTask. Handles timeouts on ACK + */ + private class AckTimeoutTask extends TimerTask { + + int ackId; + IOAcknowledgeWithTimeout ack; + + AckTimeoutTask(int ackId, IOAcknowledgeWithTimeout ack) { + this.ackId = ackId; + this.ack = ack; + } + + /* + * (non-Javadoc) + * + * @see java.util.TimerTask#run() + */ + @Override + public void run() { + synchronized (acknowledge) { + acknowledge.remove(ackId); + } + logger.warning("! ack " + ackId + " timeout"); + ack.timeout(); + } + } + /** * adds an {@link IOAcknowledge} to an {@link IOMessage}. * @@ -393,8 +426,20 @@ public void ack(JsonElement... args) { private void synthesizeAck(IOMessage message, IOAcknowledge ack) { if (ack != null) { int id = nextId++; - acknowledge.put(id, ack); + AckEntry entry = new AckEntry(); + entry.ack = ack; + if (ack instanceof IOAcknowledgeWithTimeout) { + IOAcknowledgeWithTimeout timeoutAck = (IOAcknowledgeWithTimeout) ack; + entry.ackTimeoutTask = new AckTimeoutTask(id, timeoutAck); + } message.setId(id + "+"); + synchronized (acknowledge) { + acknowledge.put(id, entry); + } + if (entry.ackTimeoutTask != null) { + long timeout = entry.ackTimeoutTask.ack.getTimeout(); + backgroundTimer.schedule(entry.ackTimeoutTask, timeout); + } } } @@ -525,6 +570,7 @@ public synchronized void transportConnected() { if (reconnectTask != null) { reconnectTask.cancel(); reconnectTask = null; + onReconnect(); } resetTimeout(); this.keepAliveInQueue = false; @@ -696,17 +742,25 @@ public void transportMessage(String text) { if (data.length == 2) { try { int id = Integer.parseInt(data[0]); - IOAcknowledge ack = acknowledge.get(id); - if (ack == null) - logger.warning("Received unknown ack packet"); - else { + AckEntry ackEntry = null; + synchronized (acknowledge) { + ackEntry = acknowledge.get(id); + if (ackEntry != null) { + ackEntry.ackTimeoutTask.cancel(); + acknowledge.remove(id); + } + } + if (ackEntry != null) { JsonArray array = new JsonParser().parse(data[1]).getAsJsonArray(); JsonElement[] args = new JsonElement[array.size()]; for (int i = 0; i < args.length; i++) { args[i] = array.get(i); } - ack.ack(args); + ackEntry.ack.ack(args); } + else + logger.warning("Received unknown or timeouted ack packet"); + } catch (NumberFormatException e) { logger.warning("Received malformated Acknowledge! This is potentially filling up the acknowledges!"); } catch (JsonParseException e) { @@ -903,16 +957,20 @@ public IOTransport getTransport() { @Override public void onDisconnect() { - SocketIO socket = sockets.get(""); - if (socket != null) - socket.getCallback().onDisconnect(); + for (SocketIO socket : sockets.values()) + socket.getCallback().onDisconnect(); } @Override public void onConnect() { - SocketIO socket = sockets.get(""); - if (socket != null) - socket.getCallback().onConnect(); + for (SocketIO socket : sockets.values()) + socket.getCallback().onConnect(); + } + + @Override + public void onReconnect() { + for (SocketIO socket : sockets.values()) + socket.getCallback().onReconnect(); } @Override diff --git a/src/test/java/io/socket/AbstractTestSocketIO.java b/src/test/java/io/socket/AbstractTestSocketIO.java index e37d356..ab982b4 100644 --- a/src/test/java/io/socket/AbstractTestSocketIO.java +++ b/src/test/java/io/socket/AbstractTestSocketIO.java @@ -497,7 +497,13 @@ public void onConnect() { events.add("onConnect"); } - /* + @Override + public void onReconnect() { + System.out.println("onReconnect"); + events.add("onReconnect"); + } + + /* * (non-Javadoc) * * @see io.socket.IOCallback#onMessage(java.lang.String,