diff --git a/src/main/java/net/sf/marineapi/nmea/io/UDPDataReader.java b/src/main/java/net/sf/marineapi/nmea/io/UDPDataReader.java index f6cbfc80..5ebadfdc 100644 --- a/src/main/java/net/sf/marineapi/nmea/io/UDPDataReader.java +++ b/src/main/java/net/sf/marineapi/nmea/io/UDPDataReader.java @@ -22,58 +22,92 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; -import java.util.Arrays; import java.util.LinkedList; import java.util.Queue; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * DataReader implementation using DatagramSocket as data source. - * + * * @author Kimmo Tuukkanen, Ludovic Drouineau */ class UDPDataReader extends AbstractDataReader { - private DatagramSocket socket; - private byte[] buffer = new byte[1024]; - private Queue queue = new LinkedList<>(); + private DatagramSocket socket; + private byte[] buffer = new byte[1024]; + private Queue queue = new LinkedList<>(); + private Map senderBuffers = new ConcurrentHashMap<>(); - /** - * Creates a new instance of StreamReader. - * - * @param socket DatagramSocket to be used as data source. - * @param parent SentenceReader dispatching events for this reader. - */ - UDPDataReader(DatagramSocket socket, SentenceReader parent) { - super(parent); - this.socket = socket; - } + private static class UDPPacket { + private final String data; + private final String senderKey; + + public UDPPacket(String data, String senderKey) { + this.data = data; + this.senderKey = senderKey; + } + } + + /** + * Creates a new instance of StreamReader. + * + * @param socket DatagramSocket to be used as data source. + * @param parent SentenceReader dispatching events for this reader. + */ + UDPDataReader(DatagramSocket socket, SentenceReader parent) { + super(parent); + this.socket = socket; + } + + @Override + public String read() throws Exception { + UDPPacket packet; - @Override - public String read() throws Exception { - String data = null; - - while (true) { - // If there is a backlog of sentences in the queue, then return the old sentences first so that each packet is uploaded compete. - if ((data = queue.poll()) != null) - break; + while (true) { + // If there is a backlog of sentences in the queue, then return the old sentences first so that each packet is uploaded compete. + if ((packet = queue.poll()) != null) { + break; + } - // Once the backlog is cleared, then read the port, split the packet into sentences, - // and store the individual sentences in the queue. Queue will always start empty here. - data = receive(); - String[] lines = data.split("\\r?\\n"); - queue.addAll(Arrays.asList(lines)); - } - - return data; - } + // Once the backlog is cleared, then read the port, split the packet into sentences, + // and store the individual sentences in the queue. Queue will always start empty here. + packet = receive(); - /** - * Receive UDP packet and return as String. Blocks until data is received. + StringBuilder tempBuffer = senderBuffers.computeIfAbsent( + packet.senderKey, + k -> new StringBuilder() + ); + + tempBuffer.append(packet.data); + + String[] lines = tempBuffer.toString().split("\\r?\\n", -1); + + // Contains line breaks + if (lines.length > 1) { + for (int i = 0; i < lines.length - 1; i++) { + // Complete NMEA statement, add to queue + queue.add(new UDPPacket(lines[i], packet.senderKey)); + } + tempBuffer.setLength(0); + tempBuffer.append(lines[lines.length - 1]); + } + } + return packet.data; + } + + /** + * Receive UDP packet and return as UDPPacket. Blocks until data is received. * Exceptions bubble up to the {@link AbstractDataReader} */ - private String receive() throws Exception { - DatagramPacket pkg = new DatagramPacket(buffer, buffer.length); - socket.receive(pkg); - return new String(pkg.getData(), 0, pkg.getLength()); - } + private UDPPacket receive() throws Exception { + DatagramPacket pkg = new DatagramPacket(buffer, buffer.length); + socket.receive(pkg); + String senderKey = pkg.getAddress().getHostAddress() + "_" + pkg.getPort(); + return new UDPPacket( + new String(pkg.getData(), 0, pkg.getLength()), + senderKey + ); + } + }