diff --git a/src/main/java/net/openhft/chronicle/queue/impl/RollingChronicleQueue.java b/src/main/java/net/openhft/chronicle/queue/impl/RollingChronicleQueue.java index b5df032878..73a5d615c7 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/RollingChronicleQueue.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/RollingChronicleQueue.java @@ -20,6 +20,7 @@ import net.openhft.chronicle.queue.ChronicleQueue; import net.openhft.chronicle.queue.RollCycle; import net.openhft.chronicle.queue.TailerDirection; +import net.openhft.chronicle.queue.impl.single.FileShrinkage; import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueStore; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -110,4 +111,8 @@ public interface RollingChronicleQueue extends ChronicleQueue { * @return the checkpointInterval used by delta wire */ int deltaCheckpointInterval(); + + default FileShrinkage fileShrinkage() { + return FileShrinkage.SHRINK_ASYNCHRONOUSLY; + } } diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/FileShrinkage.java b/src/main/java/net/openhft/chronicle/queue/impl/single/FileShrinkage.java new file mode 100644 index 0000000000..32badbdcad --- /dev/null +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/FileShrinkage.java @@ -0,0 +1,5 @@ +package net.openhft.chronicle.queue.impl.single; + +public enum FileShrinkage { + NONE, SHRINK_SYNCHRONOUSLY, SHRINK_ASYNCHRONOUSLY; +} diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/MetaDataField.java b/src/main/java/net/openhft/chronicle/queue/impl/single/MetaDataField.java index d387d8288f..fe771614df 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/MetaDataField.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/MetaDataField.java @@ -16,7 +16,8 @@ public enum MetaDataField implements WireKey { lastIndexReplicated, sourceId, dataFormat, - metadata; + metadata, + fileShrinkage; @Nullable @Override diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/QueueFileShrinkManager.java b/src/main/java/net/openhft/chronicle/queue/impl/single/QueueFileShrinkManager.java index b6e7e0d2c8..da5c840151 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/QueueFileShrinkManager.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/QueueFileShrinkManager.java @@ -28,8 +28,10 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import static net.openhft.chronicle.queue.impl.single.FileShrinkage.*; + public enum QueueFileShrinkManager { - ; // none + ; public static final String THREAD_NAME = "queue~file~shrink~daemon"; // don't use this with a Pretoucher enabled! public static final boolean RUN_SYNCHRONOUSLY = Jvm.getBoolean("chronicle.queue.synchronousFileShrinking"); @@ -38,10 +40,23 @@ public enum QueueFileShrinkManager { private static final ScheduledExecutorService EXECUTOR = Threads.acquireScheduledExecutorService(THREAD_NAME, true); private static final long DELAY_S = 10; - public static void scheduleShrinking(@NotNull final File queueFile, final long writePos) { + public static FileShrinkage defaultFileShrink() { + if (DISABLE_QUEUE_FILE_SHRINKING) - return; + return NONE; + if (RUN_SYNCHRONOUSLY) + return SHRINK_SYNCHRONOUSLY; + + return SHRINK_ASYNCHRONOUSLY; + } + + public static void scheduleShrinking(@NotNull final File queueFile, final long writePos, @NotNull FileShrinkage fileShrink) { + + if (fileShrink == NONE) + return; + + if (fileShrink == SHRINK_SYNCHRONOUSLY) task(queueFile, writePos); else { // The shrink is deferred a bit to allow any potentially lagging tailers/pre-touchers @@ -63,6 +78,10 @@ private static void task(@NotNull final File queueFile, final long writePos) { } try (RandomAccessFile raf = new RandomAccessFile(queueFile, "rw")) { raf.setLength(writePos); + System.out.println("setLength=" + writePos + ", raf=" + queueFile.getAbsolutePath()); + + System.out.println("reaad=" + new RandomAccessFile(queueFile, "r").length()); + } catch (IOException ex) { // on microsoft windows, keep retrying until the file is unmapped diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java index bcedf78c03..a7271feeac 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueue.java @@ -129,6 +129,10 @@ public class SingleChronicleQueue extends AbstractCloseable implements RollingCh private final boolean useSparseFile; private final long sparseCapacity; final AppenderListener appenderListener; + + @NotNull + private final FileShrinkage fileShrink; + protected int sourceId; @NotNull private Condition createAppenderCondition = NoOpCondition.INSTANCE; @@ -138,6 +142,7 @@ public class SingleChronicleQueue extends AbstractCloseable implements RollingCh protected SingleChronicleQueue(@NotNull final SingleChronicleQueueBuilder builder) { try { + fileShrink = builder.fileShrinkage(); rollCycle = builder.rollCycle(); cycleCalculator = cycleCalculator(builder.rollTimeZone()); epoch = builder.epoch(); @@ -436,6 +441,11 @@ public int deltaCheckpointInterval() { return deltaCheckpointInterval; } + @Override + public FileShrinkage fileShrinkage() { + return fileShrink; + } + /** * @return if we uses a ring buffer to buffer the appends, the Excerpts are written to the Chronicle Queue using a background thread */ diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueBuilder.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueBuilder.java index ecf9dbac7b..bbb5c3f0a6 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueBuilder.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueBuilder.java @@ -66,6 +66,7 @@ import static java.util.Objects.requireNonNull; import static net.openhft.chronicle.core.pool.ClassAliasPool.CLASS_ALIASES; +import static net.openhft.chronicle.queue.impl.single.QueueFileShrinkManager.defaultFileShrink; import static net.openhft.chronicle.queue.impl.single.SingleChronicleQueue.QUEUE_METADATA_FILE; import static net.openhft.chronicle.wire.WireType.DEFAULT_ZERO_BINARY; import static net.openhft.chronicle.wire.WireType.DELTA_BINARY; @@ -163,6 +164,25 @@ public static void addAliases() { // static initialiser. } + + private FileShrinkage fileShrinkage = defaultFileShrink(); + + /** + * @return if set, shrinks the .cq4 file after roll + */ + public FileShrinkage fileShrinkage() { + return fileShrinkage; + } + + /** + * @return sets shrinks the .cq4 file after roll, or use net.openhft.chronicle.queue.impl.single.FileShrink#NONE + * if not required, default is net.openhft.chronicle.queue.impl.single.FileShrink#SHRINK_ASYNCHRONOUSLY + */ + public SingleChronicleQueueBuilder fileShrinkage(FileShrinkage fileShrink) { + this.fileShrinkage = fileShrink; + return this; + } + /** * @return an empty builder */ @@ -262,6 +282,7 @@ static SingleChronicleQueueStore createStore(@NotNull RollingChronicleQueue queu queue.indexCount(), queue.indexSpacing()); + wireStore.fileShrinkage(queue.fileShrinkage()); wire.writeEventName(MetaDataKeys.header).typedMarshallable(wireStore); return wireStore; } @@ -1065,7 +1086,7 @@ public Supplier<BiConsumer<BytesStore, Bytes<?>>> decodingSupplier() { } public SingleChronicleQueueBuilder codingSuppliers(@Nullable - Supplier<BiConsumer<BytesStore, Bytes<?>>> encodingSupplier, + Supplier<BiConsumer<BytesStore, Bytes<?>>> encodingSupplier, @Nullable Supplier<BiConsumer<BytesStore, Bytes<?>>> decodingSupplier) { if ((encodingSupplier == null) != (decodingSupplier == null)) throw new UnsupportedOperationException("Both encodingSupplier and decodingSupplier must be set or neither"); diff --git a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java index 304463fc8d..d2fc928df6 100644 --- a/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java +++ b/src/main/java/net/openhft/chronicle/queue/impl/single/SingleChronicleQueueStore.java @@ -39,6 +39,8 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; +import static net.openhft.chronicle.queue.impl.single.QueueFileShrinkManager.*; + public class SingleChronicleQueueStore extends AbstractCloseable implements WireStore { static { ClassAliasPool.CLASS_ALIASES.addAlias(SCQIndexing.class); @@ -57,6 +59,9 @@ public class SingleChronicleQueueStore extends AbstractCloseable implements Wire @NotNull private final transient Sequence sequence; + @NotNull + private FileShrinkage fileShrinkage = defaultFileShrink(); + private int cycle; /** @@ -77,6 +82,14 @@ private SingleChronicleQueueStore(@NotNull WireIn wire) { this.indexing.writePosition = writePosition; this.sequence = new RollCycleEncodeSequence(writePosition, rollIndexCount(), rollIndexSpacing()); this.indexing.sequence = sequence; + + try { + ValueIn read = wire.read(MetaDataField.fileShrinkage); + if (read.isPresent()) + this.fileShrinkage = read.asEnum(FileShrinkage.class); + } catch (Exception ignore) { + } + if (wire.bytes().readRemaining() > 0) { final int version = wire.read(MetaDataField.dataFormat).int32(); this.dataVersion = version > 1 ? 0 : version; @@ -341,6 +354,7 @@ public void writeMarshallable(@NotNull WireOut wire) { intForBinding(wireOut, writePosition) .write(MetaDataField.indexing).typedMarshallable(this.indexing) .write(MetaDataField.dataFormat).int32(dataVersion); + wire.write(MetaDataField.fileShrinkage).asEnum(fileShrinkage); } @Override @@ -408,10 +422,19 @@ public boolean writeEOF(@NotNull Wire wire, long timeoutMS) { } } + public FileShrinkage fileShrinkage() { + return fileShrinkage; + } + + public SingleChronicleQueueStore fileShrinkage(@NotNull FileShrinkage fileShrinkage) { + this.fileShrinkage = fileShrinkage; + return this; + } + boolean writeEOFAndShrink(@NotNull Wire wire, long timeoutMS) { if (wire.writeEndOfWire(timeoutMS, TimeUnit.MILLISECONDS, writePosition())) { // only if we just written EOF - QueueFileShrinkManager.scheduleShrinking(mappedFile.file(), wire.bytes().writePosition()); + scheduleShrinking(mappedFile.file(), wire.bytes().writePosition(), fileShrinkage); return true; } return false; diff --git a/src/test/java/net/openhft/chronicle/queue/impl/single/FileShrinkageTest.java b/src/test/java/net/openhft/chronicle/queue/impl/single/FileShrinkageTest.java new file mode 100644 index 0000000000..6735b3388f --- /dev/null +++ b/src/test/java/net/openhft/chronicle/queue/impl/single/FileShrinkageTest.java @@ -0,0 +1,54 @@ +package net.openhft.chronicle.queue.impl.single; + +import net.openhft.chronicle.core.time.SetTimeProvider; +import net.openhft.chronicle.queue.ChronicleQueueTestBase; +import net.openhft.chronicle.queue.ExcerptAppender; +import net.openhft.chronicle.queue.RollCycles; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; + +public class FileShrinkageTest extends ChronicleQueueTestBase { + + @Test + public void testShrinkSynchronously() throws IOException, InterruptedException { + + final File dataDir = getTmpDir(); + final SetTimeProvider timeProvider = new SetTimeProvider(); + + File file; + try (final SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(dataDir) + .rollCycle(RollCycles.TEST_SECONDLY) + .fileShrinkage(FileShrinkage.SHRINK_SYNCHRONOUSLY) + .timeProvider(timeProvider).build()) { + final ExcerptAppender excerptAppender = queue.acquireAppender(); + excerptAppender.writeText("hello"); + file = excerptAppender.currentFile(); + } + + timeProvider.advanceMillis(2_000); + Thread.sleep(2_000); + + try (final SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(dataDir) + .rollCycle(RollCycles.TEST_SECONDLY) + .timeProvider(timeProvider) + .fileShrinkage(FileShrinkage.SHRINK_SYNCHRONOUSLY) + .build()) { + + + // we should not have to do this, but even if we do it still does not work. + // queue.acquireAppender(); + + try (final RandomAccessFile raf = new RandomAccessFile(file, "r")) { + final long len = raf.length(); + System.out.println("len=" + len + ", file=" + file.getAbsolutePath()); + Assert.assertTrue("len=" + len, len > 520000 && len < 530000); + } + } + } + + +}