Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 73 additions & 39 deletions src/main/java/net/sf/marineapi/nmea/io/UDPDataReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,58 +22,92 @@

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* DataReader implementation using DatagramSocket as data source.
*
*
* @author Kimmo Tuukkanen, Ludovic Drouineau
*/
class UDPDataReader extends AbstractDataReader {

private DatagramSocket socket;
private byte[] buffer = new byte[1024];
private Queue<String> queue = new LinkedList<>();
private DatagramSocket socket;
private byte[] buffer = new byte[1024];
private Queue<UDPPacket> queue = new LinkedList<>();
private Map<String, StringBuilder> senderBuffers = new ConcurrentHashMap<>();

/**
* Creates a new instance of StreamReader.
*
* @param socket DatagramSocket to be used as data source.
* @param parent SentenceReader dispatching events for this reader.
*/
UDPDataReader(DatagramSocket socket, SentenceReader parent) {
super(parent);
this.socket = socket;
}
private static class UDPPacket {
private final String data;
private final String senderKey;

public UDPPacket(String data, String senderKey) {
this.data = data;
this.senderKey = senderKey;
}
}

/**
* Creates a new instance of StreamReader.
*
* @param socket DatagramSocket to be used as data source.
* @param parent SentenceReader dispatching events for this reader.
*/
UDPDataReader(DatagramSocket socket, SentenceReader parent) {
super(parent);
this.socket = socket;
}

@Override
public String read() throws Exception {
UDPPacket packet;

@Override
public String read() throws Exception {
String data = null;

while (true) {
// If there is a backlog of sentences in the queue, then return the old sentences first so that each packet is uploaded compete.
if ((data = queue.poll()) != null)
break;
while (true) {
// If there is a backlog of sentences in the queue, then return the old sentences first so that each packet is uploaded compete.
if ((packet = queue.poll()) != null) {
break;
}

// Once the backlog is cleared, then read the port, split the packet into sentences,
// and store the individual sentences in the queue. Queue will always start empty here.
data = receive();
String[] lines = data.split("\\r?\\n");
queue.addAll(Arrays.asList(lines));
}

return data;
}
// Once the backlog is cleared, then read the port, split the packet into sentences,
// and store the individual sentences in the queue. Queue will always start empty here.
packet = receive();

/**
* Receive UDP packet and return as String. Blocks until data is received.
StringBuilder tempBuffer = senderBuffers.computeIfAbsent(
packet.senderKey,
k -> new StringBuilder()
);

tempBuffer.append(packet.data);

String[] lines = tempBuffer.toString().split("\\r?\\n", -1);

// Contains line breaks
if (lines.length > 1) {
for (int i = 0; i < lines.length - 1; i++) {
// Complete NMEA statement, add to queue
queue.add(new UDPPacket(lines[i], packet.senderKey));
}
tempBuffer.setLength(0);
tempBuffer.append(lines[lines.length - 1]);
}
}
return packet.data;
}

/**
* Receive UDP packet and return as UDPPacket. Blocks until data is received.
* Exceptions bubble up to the {@link AbstractDataReader}
*/
private String receive() throws Exception {
DatagramPacket pkg = new DatagramPacket(buffer, buffer.length);
socket.receive(pkg);
return new String(pkg.getData(), 0, pkg.getLength());
}
private UDPPacket receive() throws Exception {
DatagramPacket pkg = new DatagramPacket(buffer, buffer.length);
socket.receive(pkg);
String senderKey = pkg.getAddress().getHostAddress() + "_" + pkg.getPort();
return new UDPPacket(
new String(pkg.getData(), 0, pkg.getLength()),
senderKey
);
}

}
Loading