Skip to content

Commit f4c5abf

Browse files
committed
few fixes on IO
1 parent 828db3c commit f4c5abf

File tree

5 files changed

+72
-7
lines changed

5 files changed

+72
-7
lines changed

net.lecousin.core/src/main/java/net/lecousin/framework/io/IOUtil.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ public void ready(Integer result) {
253253

254254
@Override
255255
public void error(IOException error) {
256+
if (ondone != null) ondone.run(new Pair<>(null, error));
256257
sp.unblockError(error);
257258
}
258259

net.lecousin.core/src/main/java/net/lecousin/framework/io/buffering/PreBufferedReadable.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,15 @@ public void run() {
262262
}
263263
buffer.flip();
264264
synchronized (PreBufferedReadable.this) {
265+
if (firstReadTask.isCancelled()) {
266+
if (dataReady != null) {
267+
SynchronizationPoint<NoException> dr = dataReady;
268+
dataReady = null;
269+
dr.unblock();
270+
}
271+
jpNextRead.joined();
272+
return;
273+
}
265274
Throwable e = firstReadTask.getError();
266275
if (singleRead && e == null && firstReadTask.getResult().intValue() < size)
267276
e = new IOException("Only " + firstReadTask.getResult().intValue()
@@ -420,7 +429,10 @@ public AsyncWork<Integer,IOException> readAsync(ByteBuffer buffer, RunnableWithP
420429
@Override
421430
public Integer run() throws IOException, CancelException {
422431
if (error != null) throw error;
423-
if (buffersReady == null) throw new CancelException("IO Closed");
432+
if (buffersReady == null) {
433+
if (endReached) return Integer.valueOf(-1); // case of empty readable
434+
throw new CancelException("IO Closed");
435+
}
424436
if (current == null) {
425437
if (endReached) return Integer.valueOf(-1);
426438
if (isClosing() || isClosed()) throw new CancelException("IO Closed");

net.lecousin.core/src/main/java/net/lecousin/framework/io/util/LimitWriteOperations.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import net.lecousin.framework.collections.TurnArray;
77
import net.lecousin.framework.concurrent.synch.AsyncWork;
8+
import net.lecousin.framework.concurrent.synch.ISynchronizationPoint;
89
import net.lecousin.framework.concurrent.synch.SynchronizationPoint;
910
import net.lecousin.framework.exception.NoException;
1011
import net.lecousin.framework.io.IO;
@@ -28,6 +29,7 @@ public LimitWriteOperations(IO.Writable io, int maxOperations) {
2829
private IO.Writable io;
2930
private TurnArray<Pair<ByteBuffer,AsyncWork<Integer,IOException>>> waiting;
3031
private SynchronizationPoint<NoException> lock = null;
32+
private AsyncWork<Integer, IOException> lastWrite = new AsyncWork<>(Integer.valueOf(0), null);
3133

3234
/**
3335
* Queue the buffer to write. If there is no pending write, the write operation is started.
@@ -37,8 +39,8 @@ public LimitWriteOperations(IO.Writable io, int maxOperations) {
3739
public AsyncWork<Integer,IOException> write(ByteBuffer buffer) throws IOException {
3840
do {
3941
synchronized (waiting) {
40-
if (waiting.isEmpty()) {
41-
return io.writeAsync(buffer, new RunnableWithParameter<Pair<Integer,IOException>>() {
42+
if (waiting.isEmpty() && lastWrite.isUnblocked()) {
43+
return lastWrite = io.writeAsync(buffer, new RunnableWithParameter<Pair<Integer,IOException>>() {
4244
@Override
4345
public void run(Pair<Integer, IOException> param) {
4446
writeDone();
@@ -63,12 +65,12 @@ private void writeDone() {
6365
synchronized (waiting) {
6466
Pair<ByteBuffer,AsyncWork<Integer,IOException>> b = waiting.pollFirst();
6567
if (b != null) {
66-
io.writeAsync(b.getValue1(), new RunnableWithParameter<Pair<Integer,IOException>>() {
68+
(lastWrite = io.writeAsync(b.getValue1(), new RunnableWithParameter<Pair<Integer,IOException>>() {
6769
@Override
6870
public void run(Pair<Integer, IOException> param) {
6971
writeDone();
7072
}
71-
}).listenInline(b.getValue2());
73+
})).listenInline(b.getValue2());
7274
if (lock != null) {
7375
sp = lock;
7476
lock = null;
@@ -81,10 +83,17 @@ public void run(Pair<Integer, IOException> param) {
8183

8284
/** Return the last pending operation, or null. */
8385
public AsyncWork<Integer, IOException> getLastPendingOperation() {
84-
Pair<ByteBuffer,AsyncWork<Integer,IOException>> b = waiting.pollLast();
86+
Pair<ByteBuffer,AsyncWork<Integer,IOException>> b = waiting.peekLast();
8587
if (b == null)
86-
return null;
88+
return lastWrite.isUnblocked() ? null : lastWrite;
8789
return b.getValue2();
8890
}
91+
92+
/** Same as getLastPendingOperation but never return null (return an unblocked synchronization point instead). */
93+
public ISynchronizationPoint<IOException> flush() {
94+
ISynchronizationPoint<IOException> sp = getLastPendingOperation();
95+
if (sp == null) sp = new SynchronizationPoint<>(true);
96+
return sp;
97+
}
8998

9099
}

net.lecousin.core/src/test/java/net/lecousin/framework/core/test/io/TestReadableSeekable.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,10 @@ public void run() {
215215
sp.error(read.get().getError());
216216
return;
217217
}
218+
if (read.get().isCancelled()) {
219+
sp.cancel(read.get().getCancelEvent());
220+
return;
221+
}
218222
if (!onDoneBefore.get()) {
219223
sp.error(new Exception("Method readFullyAsync didn't call ondone before listeners"));
220224
return;
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package net.lecousin.framework.core.tests.io.util;
2+
3+
import java.io.File;
4+
import java.nio.ByteBuffer;
5+
6+
import net.lecousin.framework.concurrent.Task;
7+
import net.lecousin.framework.core.test.LCCoreAbstractTest;
8+
import net.lecousin.framework.io.FileIO;
9+
import net.lecousin.framework.io.util.LimitWriteOperations;
10+
11+
import org.junit.Assert;
12+
import org.junit.Test;
13+
14+
public class TestLimitWriteOperations extends LCCoreAbstractTest {
15+
16+
@Test
17+
public void test() throws Exception {
18+
File tmp = File.createTempFile("test", "lwo");
19+
tmp.deleteOnExit();
20+
FileIO.WriteOnly io = new FileIO.WriteOnly(tmp, Task.PRIORITY_NORMAL);
21+
LimitWriteOperations writeOps = new LimitWriteOperations(io, 3);
22+
byte[] data = new byte[100];
23+
for (int i = 0; i < data.length; ++i)
24+
data[i] = (byte)i;
25+
for (int i = 0; i < 500; ++i)
26+
writeOps.write(ByteBuffer.wrap(data));
27+
writeOps.flush().blockThrow(0);
28+
io.close();
29+
FileIO.ReadOnly in = new FileIO.ReadOnly(tmp, Task.PRIORITY_NORMAL);
30+
byte[] buf = new byte[data.length];
31+
for (int i = 0; i < 500; ++i) {
32+
Assert.assertEquals(data.length, in.readFullySync(ByteBuffer.wrap(buf)));
33+
Assert.assertArrayEquals(data, buf);
34+
}
35+
Assert.assertTrue(in.readFullySync(ByteBuffer.wrap(buf)) <= 0);
36+
in.close();
37+
}
38+
39+
}

0 commit comments

Comments
 (0)