Skip to content

Added SharedBatchWriterQueue for SharedBatchWriter object #5561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: 2.1
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,16 @@ public enum Property {
"The interval at which to check for external compaction final state markers in the metadata table.",
"2.1.0"),
@Experimental
COMPACTION_COORDINATOR_FINALIZER_WRITER_QUEUE(
"compaction.coordinator.compaction.finalizer.queue.impl",
"org.apache.accumulo.coordinator.BlockingSharedBatchWriterQueue.class",
PropertyType.CLASSNAME,
"SharedBatchWriterQueue implementation class name to use when queueing mutations"
+ " in the CompactionFinalizerWriters. The default implementation has a fixed size, 1/16th of COMPACTION_COORDINATOR_FINALIZER_QUEUE_SIZE,"
+ " and Compactor RPC threads will block when trying to add work to the queue and will wait for the mutation to be flushed"
+ " to the metadata table.",
"2.1.4"),
@Experimental
COMPACTION_COORDINATOR_FINALIZER_QUEUE_SIZE(
"compaction.coordinator.compaction.finalizer.queue.size", "16384", PropertyType.COUNT,
"The number of completed compactions to buffer in memory before blocking.", "2.1.4"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.spi.util;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;

import org.apache.accumulo.core.data.Mutation;

/**
* Queue of mutations used by the SharedBatchWriter
*/
public interface SharedBatchWriterQueue {

class Work {
private final String id;
private final Mutation mutation;
private final CompletableFuture<Void> future;

public Work(String id, Mutation mutation) {
this.id = id;
this.mutation = mutation;
this.future = new CompletableFuture<>();
}

public Mutation getMutation() {
return mutation;
}

public String getId() {
return id;
}

public CompletableFuture<Void> getFuture() {
return future;
}

}

/**
* Initialization parameters
*
* @since 2.1.4
*/
interface InitParameters {
int getQueueSize();
}

/**
* Initialize the SharedBatchWriterQueue
*
* @param params initialization parameters
* @since 2.1.4
*/
void init(InitParameters params);

/**
* Add the {@code Work} item to the queue
*
* @param work work to be added to the queue
* @throws InterruptedException interrupted while waiting
* @since 2.1.4
*/
void add(Work work) throws InterruptedException;

/**
* Remove a {@code Work} item from the queue
*
* @return work item
* @throws InterruptedException interrupted while waiting for item
* @since 2.1.4
*/
Work remove() throws InterruptedException;

/**
* Remove all {@code Work} items from the queue and into the supplied Collection
*
* @param work destination for the removed items
* @throws InterruptedException interrupted while filling destination
* @since 2.1.4
*/
void removeAll(Collection<Work> work) throws InterruptedException;

/**
* Return an Iterator over the {@code Work} elements in the queue
*
* @return iterator
* @since 2.1.4
*/
Iterator<Work> iterator();

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,95 +16,80 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.coordinator;
package org.apache.accumulo.core.util;

import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.spi.util.SharedBatchWriterQueue;
import org.apache.accumulo.core.spi.util.SharedBatchWriterQueue.Work;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.ServerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;

/**
* This class supports the use case of many threads writing a single mutation to a table. It avoids
* each thread creating its own batch writer which creates threads and makes 3 RPCs to write the
* single mutation. Using this class results in much less thread creation and RPCs.
* This class supports the use case of many threads writing mutations to a table. Instead of each
* thread creating their own batch writer, each thread can add mutations to the queue that this
* shared batch writer reads from. This is more efficient than creating a batch writer to add a
* single mutation to a table as the batch writer would make 3 RPCs to write the single mutation.
* Using this class results in much less thread creation and RPCs.
*/
public class SharedBatchWriter {
private static final Logger log = LoggerFactory.getLogger(SharedBatchWriter.class);
private final Character prefix;

private static class Work {
private final Mutation mutation;
private final CompletableFuture<Void> future;

private Work(Mutation mutation) {
this.mutation = mutation;
this.future = new CompletableFuture<>();
}
}

private final BlockingQueue<Work> mutations;
private final SharedBatchWriterQueue queue;
private final String table;
private final ServerContext context;
private final ClientContext context;

public SharedBatchWriter(String table, Character prefix, ServerContext context, int queueSize) {
Preconditions.checkArgument(queueSize > 0, "illegal queue size %s", queueSize);
public SharedBatchWriter(String table, Character prefix, ClientContext context,
SharedBatchWriterQueue queue) {
Objects.requireNonNull(table, "Missing table");
Objects.requireNonNull(context, "Missing context");
Objects.requireNonNull(queue, "Missing queue");
this.table = table;
this.prefix = prefix;
this.context = context;
this.mutations = new ArrayBlockingQueue<>(queueSize);
this.queue = queue;
var thread = Threads.createCriticalThread(
"shared batch writer for " + table + " prefix:" + prefix, this::processMutations);
thread.start();
}

public void write(Mutation m) {
try {
var work = new Work(m);
mutations.put(work);
work.future.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
}

private void processMutations() {
Timer timer = Timer.startNew();
while (true) {
ArrayList<Work> batch = new ArrayList<>();
try {
batch.add(mutations.take());
batch.add(queue.remove());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}

try (var writer = context.createBatchWriter(table)) {
mutations.drainTo(batch);
try {
queue.removeAll(batch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
timer.restart();
for (var work : batch) {
writer.addMutation(work.mutation);
writer.addMutation(work.getMutation());
}
writer.flush();
log.trace("Wrote {} mutations in {}ms for prefix {}", batch.size(),
timer.elapsed(TimeUnit.MILLISECONDS), prefix);
batch.forEach(work -> work.future.complete(null));
batch.forEach(work -> work.getFuture().complete(null));
} catch (TableNotFoundException | MutationsRejectedException e) {
log.debug("Failed to process {} mutations in {}ms for prefix {}", batch.size(),
timer.elapsed(TimeUnit.MILLISECONDS), prefix, e);
batch.forEach(work -> work.future.completeExceptionally(e));
batch.forEach(work -> work.getFuture().completeExceptionally(e));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.util;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;

import org.apache.accumulo.core.spi.util.SharedBatchWriterQueue;

/**
* SharedBatchWriterQueue backed by an ArrayBlockingQueue. Calls to
* {@code #add(org.apache.accumulo.coordinator.SharedBatchWriterQueue.Work} will block if the
* backing queue is full and will wait until the queued Work item has been flushed to the underlying
* table.
*/
public class SharedBatchWriterBlockingQueue implements SharedBatchWriterQueue {

private ArrayBlockingQueue<Work> queue = null;

@Override
public void init(InitParameters params) {
queue = new ArrayBlockingQueue<Work>(params.getQueueSize());
}

@Override
public void add(Work work) throws InterruptedException {
queue.put(work);
work.getFuture().join();
}

@Override
public Work remove() throws InterruptedException {
return queue.take();
}

@Override
public void removeAll(Collection<Work> work) {
queue.drainTo(work);
}

@Override
public Iterator<Work> iterator() {
return queue.iterator();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,10 @@ protected long getTServerCheckInterval() {
.getTimeInMillis(Property.COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL);
}

CompactionFinalizer getFinalizer() {
return compactionFinalizer;
}

/**
* Callback for the LiveTServerSet object to update current set of tablet servers, including ones
* that were deleted and added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -72,16 +72,13 @@ public class CompactionFinalizer {
private final ExecutorService backgroundExecutor;
private final BlockingQueue<ExternalCompactionFinalState> pendingNotifications;
private final long tserverCheckInterval;
private final ConcurrentHashMap<Character,SharedBatchWriter> writers =
new ConcurrentHashMap<>(16);
private final int queueSize;
private final CompactionFinalizerWriters writers;

protected CompactionFinalizer(ServerContext context, ScheduledThreadPoolExecutor schedExecutor) {
this.context = context;
queueSize =
context.getConfiguration().getCount(Property.COMPACTION_COORDINATOR_FINALIZER_QUEUE_SIZE);

this.pendingNotifications = new ArrayBlockingQueue<>(queueSize);
this.writers = new CompactionFinalizerWriters(this.context);
this.pendingNotifications = new ArrayBlockingQueue<>(writers.getQueueSize());

tserverCheckInterval = this.context.getConfiguration()
.getTimeInMillis(Property.COMPACTION_COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL);
Expand All @@ -105,10 +102,10 @@ protected CompactionFinalizer(ServerContext context, ScheduledThreadPoolExecutor

}

private SharedBatchWriter getWriter(ExternalCompactionId ecid) {
return writers.computeIfAbsent(ecid.getFirstUUIDChar(),
(prefix) -> new SharedBatchWriter(Ample.DataLevel.USER.metaTable(), prefix, context,
queueSize / 16));
public Set<ExternalCompactionId> getQueuedCompletedCompactions() {
Set<ExternalCompactionId> ids = writers.getQueuedIds();
pendingNotifications.forEach(ecfs -> ids.add(ecfs.getExternalCompactionId()));
return ids;
}

public void commitCompaction(ExternalCompactionId ecid, KeyExtent extent, long fileSize,
Expand All @@ -117,14 +114,12 @@ public void commitCompaction(ExternalCompactionId ecid, KeyExtent extent, long f
var ecfs =
new ExternalCompactionFinalState(ecid, extent, FinalState.FINISHED, fileSize, fileEntries);

SharedBatchWriter writer = getWriter(ecid);

LOG.trace("Initiating commit for external compaction: {} {}", ecid, ecfs);

// write metadata entry
Timer timer = Timer.startNew();
writer.write(ecfs.toMutation());
LOG.trace("{} metadata compation state write completed in {}ms", ecid,
writers.write(ecfs);
LOG.trace("{} metadata compaction state write completed in {}ms", ecid,
timer.elapsed(TimeUnit.MILLISECONDS));

if (!pendingNotifications.offer(ecfs)) {
Expand All @@ -139,7 +134,7 @@ public void failCompactions(Map<ExternalCompactionId,KeyExtent> compactionsToFai
var e = compactionsToFail.entrySet().iterator().next();
var ecfs =
new ExternalCompactionFinalState(e.getKey(), e.getValue(), FinalState.FAILED, 0L, 0L);
getWriter(e.getKey()).write(ecfs.toMutation());
writers.write(ecfs);
} else {
try (BatchWriter writer = context.createBatchWriter(Ample.DataLevel.USER.metaTable())) {
for (var e : compactionsToFail.entrySet()) {
Expand Down
Loading