Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions dev-support/Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pipeline {
environment {
YETUS='yetus'
// Branch or tag name. Yetus release tags are 'rel/X.Y.Z'
YETUS_VERSION='rel/0.14.0'
YETUS_VERSION='a7d29a6a72750a0c5c39512f33945e773e69303e'
}

parameters {
Expand All @@ -71,7 +71,7 @@ pipeline {
checkout([
$class: 'GitSCM',
branches: [[name: "${env.YETUS_VERSION}"]],
userRemoteConfigs: [[ url: 'https://github.com/apache/yetus.git']]]
userRemoteConfigs: [[ url: 'https://github.com/ayushtkn/yetus.git']]]
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopThread;
import org.apache.hadoop.conf.ReconfigurationUtil.PropertyChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -105,15 +106,16 @@ public Collection<PropertyChange> getChangedProperties(
/**
* A background thread to apply configuration changes.
*/
private static class ReconfigurationThread extends Thread {
private static class ReconfigurationThread extends HadoopThread {
private ReconfigurableBase parent;

ReconfigurationThread(ReconfigurableBase base) {
super();
this.parent = base;
}

// See {@link ReconfigurationServlet#applyChanges}
public void run() {
public void work() {
LOG.info("Starting reconfiguration task.");
final Configuration oldConf = parent.getConf();
final Configuration newConf = parent.getNewConf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.concurrent.HadoopThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -107,7 +108,7 @@ void init() {
*/
private void initRefreshThread(boolean runImmediately) {
if (refreshInterval > 0) {
refreshUsed = new Thread(new RefreshThread(this, runImmediately),
refreshUsed = new HadoopThread(new RefreshThread(this, runImmediately),
"refreshUsed-" + dirPath);
refreshUsed.setDaemon(true);
refreshUsed.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,7 +39,7 @@
*/
@InterfaceAudience.Private
public class DelegationTokenRenewer
extends Thread {
extends HadoopThread {
private static final Logger LOG = LoggerFactory
.getLogger(DelegationTokenRenewer.class);

Expand Down Expand Up @@ -263,7 +264,7 @@ public <T extends FileSystem & Renewable> void removeRenewAction(
}

@Override
public void run() {
public void work() {
for(;;) {
RenewAction<?> action = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.concurrent.HadoopThread;
import org.apache.hadoop.tracing.Tracer;
import org.apache.hadoop.tracing.TraceScope;
import org.apache.hadoop.util.Preconditions;
Expand Down Expand Up @@ -4087,7 +4088,7 @@ private interface StatisticsAggregator<T> {
static {
STATS_DATA_REF_QUEUE = new ReferenceQueue<>();
// start a single daemon cleaner thread
STATS_DATA_CLEANER = new Thread(new StatisticsDataReferenceCleaner());
STATS_DATA_CLEANER = new HadoopThread(new StatisticsDataReferenceCleaner());
STATS_DATA_CLEANER.
setName(StatisticsDataReferenceCleaner.class.getName());
STATS_DATA_CLEANER.setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public void uncaughtException(Thread t, Throwable e) {
}

@Override
public void run() {
public void work() {
while (shouldRun) {
try {
loopUntilConnected();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ha;

import org.apache.hadoop.util.concurrent.HadoopThread;
import org.slf4j.Logger;

import java.io.BufferedReader;
Expand Down Expand Up @@ -50,7 +51,7 @@ enum StreamType {
this.stream = stream;
this.type = type;

thread = new Thread(new Runnable() {
thread = new HadoopThread(new Runnable() {
@Override
public void run() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ void tryStart() {
if (running.compareAndSet(null, current)) {
final Daemon daemon = new Daemon() {
@Override
public void run() {
public void work() {
for (; isRunning(this);) {
final long waitTime = checkCalls();
tryStop(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.apache.hadoop.util.concurrent.HadoopThread;
import org.apache.hadoop.tracing.Span;
import org.apache.hadoop.tracing.Tracer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -407,7 +408,7 @@ public synchronized void setRpcResponse(Writable rpcResponse) {
/** Thread that reads responses and notifies callers. Each connection owns a
* socket connected to a remote address. Calls are multiplexed through this
* socket: responses may be delivered out of order. */
private class Connection extends Thread {
private class Connection extends HadoopThread {
private InetSocketAddress server; // server ip:port
private final ConnectionId remoteId; // connection id
private AuthMethod authMethod; // authentication method
Expand Down Expand Up @@ -448,7 +449,7 @@ private class Connection extends Thread {
Consumer<Connection> removeMethod) {
this.remoteId = remoteId;
this.server = remoteId.getAddress();
this.rpcRequestThread = new Thread(new RpcRequestSender(),
this.rpcRequestThread = new HadoopThread(new RpcRequestSender(),
"IPC Parameter Sending Thread for " + remoteId);
this.rpcRequestThread.setDaemon(true);

Expand Down Expand Up @@ -1126,7 +1127,7 @@ private synchronized void sendPing() throws IOException {
}

@Override
public void run() {
public void work() {
try {
// Don't start the ipc parameter sending thread until we start this
// thread, because the shutdown logic only gets triggered if this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@
import org.apache.hadoop.util.ProtoUtil;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopThread;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.tracing.Span;
import org.apache.hadoop.tracing.SpanContext;
Expand Down Expand Up @@ -1471,7 +1473,7 @@ public String toString() {
}

/** Listens on the socket. Creates jobs for the handler threads*/
private class Listener extends Thread {
private class Listener extends HadoopThread {

private ServerSocketChannel acceptChannel = null; //the accept channel
private Selector selector = null; //the selector that we use for the server
Expand Down Expand Up @@ -1520,7 +1522,7 @@ void setIsAuxiliary() {
this.isOnAuxiliaryPort = true;
}

private class Reader extends Thread {
private class Reader extends HadoopThread {
final private BlockingQueue<Connection> pendingConnections;
private final Selector readSelector;

Expand All @@ -1533,7 +1535,7 @@ private class Reader extends Thread {
}

@Override
public void run() {
public void work() {
LOG.info("Starting " + Thread.currentThread().getName());
try {
doRunLoop();
Expand Down Expand Up @@ -1612,7 +1614,7 @@ void shutdown() {
}

@Override
public void run() {
public void work() {
LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
connectionManager.startIdleScan();
Expand Down Expand Up @@ -1760,7 +1762,7 @@ Reader getReader() {
}

// Sends responses of RPC back to clients.
private class Responder extends Thread {
private class Responder extends HadoopThread {
private final Selector writeSelector;
private int pending; // connections waiting to register

Expand All @@ -1772,7 +1774,7 @@ private class Responder extends Thread {
}

@Override
public void run() {
public void work() {
LOG.info(Thread.currentThread().getName() + ": starting");
SERVER.set(Server.this);
try {
Expand Down Expand Up @@ -3219,15 +3221,15 @@ private void internalQueueCall(Call call, boolean blocking)
}

/** Handles queued calls . */
private class Handler extends Thread {
private class Handler extends HadoopThread {
public Handler(int instanceNumber) {
this.setDaemon(true);
this.setName("IPC Server handler "+ instanceNumber +
" on default port " + port);
}

@Override
public void run() {
public void work() {
LOG.debug("{}: starting", Thread.currentThread().getName());
SERVER.set(Server.this);
while (running) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.hadoop.metrics2.MetricsFilter;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,7 +49,7 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
private final MetricsSink sink;
private final MetricsFilter sourceFilter, recordFilter, metricFilter;
private final SinkQueue<MetricsBuffer> queue;
private final Thread sinkThread;
private final HadoopThread sinkThread;
private volatile boolean stopping = false;
private volatile boolean inError = false;
private final int periodMs, firstRetryDelay, retryCount;
Expand Down Expand Up @@ -84,8 +85,8 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
"Dropped updates per sink", 0);
qsize = registry.newGauge("Sink_"+ name + "Qsize", "Queue size", 0);

sinkThread = new Thread() {
@Override public void run() {
sinkThread = new HadoopThread() {
@Override public void work() {
publishMetricsFromQueue();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.hadoop.util.NativeCodeLoader;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;

import org.apache.hadoop.util.concurrent.HadoopThread;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -440,7 +440,7 @@ private void sendCallbackAndRemove(String caller,
}

@VisibleForTesting
final Thread watcherThread = new Thread(new Runnable() {
final Thread watcherThread = new HadoopThread(new Runnable() {
@Override
public void run() {
if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;

import org.apache.hadoop.util.concurrent.HadoopThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -930,7 +930,7 @@ private void executeAutoRenewalTask(final String userName,
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
Thread t = new HadoopThread(r);
t.setDaemon(true);
t.setName("TGT Renewer for " + userName);
return t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time;

import org.apache.hadoop.util.concurrent.HadoopThread;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.functional.InvocationRaisingIOE;
import org.slf4j.Logger;
Expand Down Expand Up @@ -912,12 +912,12 @@ public boolean isRunning() {
return running;
}

private class ExpiredTokenRemover extends Thread {
private class ExpiredTokenRemover extends HadoopThread {
private long lastMasterKeyUpdate;
private long lastTokenCacheCleanup;

@Override
public void run() {
public void work() {
LOG.info("Starting expired delegation token remover thread, "
+ "tokenRemoverScanInterval=" + tokenRemoverScanInterval
/ (60 * 1000) + " min(s)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.concurrent.HadoopThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -116,7 +117,7 @@ public void interrupted(IrqHandler.InterruptData interruptData) {
//start an async shutdown thread with a timeout
ServiceForcedShutdown shutdown =
new ServiceForcedShutdown(service, shutdownTimeMillis);
Thread thread = new Thread(shutdown);
Thread thread = new HadoopThread(shutdown);
thread.setDaemon(true);
thread.setName("Service Forced Shutdown");
thread.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.concurrent.HadoopThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -74,7 +75,7 @@ public AsyncDiskService(String[] volumes) {
threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(threadGroup, r);
return new HadoopThread(threadGroup, r);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.util.concurrent.HadoopThread;

/**
* This ExecutorService blocks the submission of new tasks when its queue is
Expand Down Expand Up @@ -71,7 +72,7 @@ static ThreadFactory getNamedThreadFactory(final String prefix) {
public Thread newThread(Runnable r) {
final String name =
prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
return new Thread(group, r, name);
return new HadoopThread(group, r, name);
}
};
}
Expand Down
Loading