Skip to content

QUARKUS-3230 Write combining #428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 107 additions & 20 deletions src/main/java/org/jboss/logmanager/handlers/WriterHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@

package org.jboss.logmanager.handlers;

import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.Flushable;
import java.io.Writer;
import java.io.*;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import java.util.logging.ErrorManager;
import java.util.logging.Formatter;

Expand All @@ -33,10 +35,46 @@
* A handler which writes to any {@code Writer}.
*/
public class WriterHandler extends ExtHandler {

private static final long MAX_PENDING_WRITES = 1024;
private static final AtomicLongFieldUpdater<WriterHandler> CLAIMED_WRITES_UPDATER = AtomicLongFieldUpdater
.newUpdater(WriterHandler.class, "claimedWrites");
private volatile boolean checkHeadEncoding = true;
private volatile boolean checkTailEncoding = true;
private Writer writer;
private volatile long claimedWrites;

private static final class WriteRequest {

private static final Object UNPARKED = new Object();
private static final AtomicReferenceFieldUpdater<WriteRequest, Object> BLOCKED = AtomicReferenceFieldUpdater
.newUpdater(WriteRequest.class, Object.class, "blocked");
final ExtLogRecord record;
final String formatted;
private volatile Thread blocked;

WriteRequest(ExtLogRecord record, String formatted) {
this.record = record;
this.formatted = formatted;
}

public void weakBlockUntilCompletion() {
if (BLOCKED.compareAndSet(this, null, Thread.currentThread())) {
// we have won, so we can park for real
LockSupport.park();
// spurious wakeup or interrupt can cause this so...
BLOCKED.compareAndSet(this, Thread.currentThread(), UNPARKED);
}
}

public void unblockIfAny() {
Object maybeBlocked = BLOCKED.getAndSet(this, UNPARKED);
if (maybeBlocked != null && maybeBlocked != UNPARKED) {
LockSupport.unpark((Thread) maybeBlocked);
}
}
}

private final Queue<WriteRequest> writes = new ConcurrentLinkedQueue<>();

/**
* Construct a new instance.
Expand All @@ -59,25 +97,74 @@ protected void doPublish(final ExtLogRecord record) {
return;
}
try {
lock.lock();
try {
if (writer == null) {
return;
write(record, formatted);
} catch (Exception ex) {
reportError("Error writing log message", ex, ErrorManager.WRITE_FAILURE);
}
}

private void lockedWrite(ExtLogRecord record, String formatted) throws IOException {
lock.lock();
try {
doWrite(record, formatted);
} finally {
lock.unlock();
}
}

private boolean doWrite(ExtLogRecord record, String formatted) throws IOException {
assert lock.isHeldByCurrentThread();
if (writer == null) {
return true;
}
preWrite(record);
final Writer writer = this.writer;
if (writer == null) {
return true;
}
writer.write(formatted);
// only flush if something was written
super.doPublish(record);
return false;
}

private void write(final ExtLogRecord record, final String formatted) throws IOException {
boolean block;
if (CLAIMED_WRITES_UPDATER.compareAndSet(this, 0, 1)) {
lockedWrite(record, formatted);
if (CLAIMED_WRITES_UPDATER.decrementAndGet(this) == 0) {
return;
}
} else {
WriteRequest writeRequest = new WriteRequest(record, formatted);
writes.offer(writeRequest);
long pendingWrites = CLAIMED_WRITES_UPDATER.getAndIncrement(this);
if (pendingWrites != 0) {
if (pendingWrites >= MAX_PENDING_WRITES) {
writeRequest.weakBlockUntilCompletion();
}
preWrite(record);
final Writer writer = this.writer;
if (writer == null) {
return;
return;
}
}
IOException firstIoEx = null;
do {
final WriteRequest writeRequest = writes.poll();
assert writeRequest != null;
try {
lockedWrite(writeRequest.record, writeRequest.formatted);
} catch (IOException ex) {
if (firstIoEx == null) {
firstIoEx = ex;
} else {
firstIoEx.addSuppressed(ex);
}
writer.write(formatted);
// only flush if something was written
super.doPublish(record);
} finally {
lock.unlock();
writeRequest.unblockIfAny();
}
} catch (Exception ex) {
reportError("Error writing log message", ex, ErrorManager.WRITE_FAILURE);
return;
} while (CLAIMED_WRITES_UPDATER.decrementAndGet(this) != 0);
if (firstIoEx != null) {
// it can bring all the others suppressed ones
throw firstIoEx;
}
}

Expand Down