diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java index 7f6f70c061571..bacb066184f48 100644 --- a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java +++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java @@ -41,6 +41,7 @@ import io.kubernetes.client.util.Config; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; +import io.netty.util.concurrent.DefaultThreadFactory; import java.io.File; import java.io.IOException; import java.net.SocketAddress; @@ -50,6 +51,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import javax.naming.AuthenticationException; import javax.net.ssl.SSLSession; import okhttp3.OkHttpClient; @@ -105,6 +108,8 @@ public class AuthenticationProviderOpenID implements AuthenticationProvider { private volatile AsyncHttpClient httpClient; + private ExecutorService cacheExecutor; + // A list of supported algorithms. This is the "alg" field on the JWT. // Source for strings: https://datatracker.ietf.org/doc/html/rfc7518#section-3.1. private static final String ALG_RS256 = "RS256"; @@ -188,10 +193,13 @@ public void initialize(Context context) throws IOException { .setReadTimeout(readTimeout) .setSslContext(sslContext) .build(); + cacheExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), + new DefaultThreadFactory("authentication-cache-loader")); httpClient = new DefaultAsyncHttpClient(clientConfig); k8sApiClient = fallbackDiscoveryMode != FallbackDiscoveryMode.DISABLED ? Config.defaultClient() : null; - this.openIDProviderMetadataCache = new OpenIDProviderMetadataCache(this, config, httpClient, k8sApiClient); - this.jwksCache = new JwksCache(this, config, httpClient, k8sApiClient); + this.openIDProviderMetadataCache = + new OpenIDProviderMetadataCache(this, config, httpClient, k8sApiClient, cacheExecutor); + this.jwksCache = new JwksCache(this, config, httpClient, k8sApiClient, cacheExecutor); } @Override @@ -380,6 +388,9 @@ public void close() throws IOException { if (httpClient != null) { httpClient.close(); } + if (cacheExecutor != null) { + cacheExecutor.shutdown(); + } if (k8sApiClient != null) { OkHttpClient okHttpClient = k8sApiClient.getHttpClient(); okHttpClient.dispatcher().executorService().shutdown(); diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java index 71070471d146e..c08422c533499 100644 --- a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java +++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java @@ -46,6 +46,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import javax.naming.AuthenticationException; import org.apache.pulsar.broker.ServiceConfiguration; @@ -65,7 +66,7 @@ public class JwksCache { private final AuthenticationProvider authenticationProvider; JwksCache(AuthenticationProvider authenticationProvider, ServiceConfiguration config, - AsyncHttpClient httpClient, ApiClient apiClient) throws IOException { + AsyncHttpClient httpClient, ApiClient apiClient, ExecutorService cacheExecutor) throws IOException { this.authenticationProvider = authenticationProvider; // Store the clients this.httpClient = httpClient; @@ -88,6 +89,7 @@ public class JwksCache { } }; this.cache = Caffeine.newBuilder() + .executor(cacheExecutor) .recordStats() .maximumSize(maxSize) .refreshAfterWrite(refreshAfterWriteSeconds, TimeUnit.SECONDS) diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java index 66e71c5306f8d..89ca18f6c1a5a 100644 --- a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java +++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import javax.naming.AuthenticationException; import org.apache.pulsar.broker.ServiceConfiguration; @@ -61,7 +62,7 @@ class OpenIDProviderMetadataCache { private static final String SLASH_WELL_KNOWN_OPENID_CONFIG = "/" + WELL_KNOWN_OPENID_CONFIG; OpenIDProviderMetadataCache(AuthenticationProvider authenticationProvider, ServiceConfiguration config, - AsyncHttpClient httpClient, ApiClient apiClient) { + AsyncHttpClient httpClient, ApiClient apiClient, ExecutorService cacheExecutor) { this.authenticationProvider = authenticationProvider; int maxSize = getConfigValueAsInt(config, CACHE_SIZE, CACHE_SIZE_DEFAULT); int refreshAfterWriteSeconds = getConfigValueAsInt(config, CACHE_REFRESH_AFTER_WRITE_SECONDS, @@ -78,6 +79,7 @@ class OpenIDProviderMetadataCache { } }; this.cache = Caffeine.newBuilder() + .executor(cacheExecutor) .recordStats() .maximumSize(maxSize) .refreshAfterWrite(refreshAfterWriteSeconds, TimeUnit.SECONDS) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java index b199db883044a..931a6176a3f44 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java @@ -21,6 +21,7 @@ import com.github.benmanes.caffeine.cache.AsyncCache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.MoreExecutors; import io.netty.channel.EventLoopGroup; import io.opentelemetry.api.OpenTelemetry; import java.io.IOException; @@ -56,8 +57,11 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage { private BookkeeperManagedLedgerStorageClass defaultStorageClass; private ManagedLedgerFactory managedLedgerFactory; private BookKeeper defaultBkClient; - private final AsyncCache - bkEnsemblePolicyToBkClientMap = Caffeine.newBuilder().recordStats().buildAsync(); + private final AsyncCache bkEnsemblePolicyToBkClientMap = + Caffeine.newBuilder() + .executor(MoreExecutors.directExecutor()) + .recordStats() + .buildAsync(); private StatsProvider statsProvider = new NullStatsProvider(); public ManagedLedgerClientFactory() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java index 69f5208ce6711..0bc507efbcd91 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java @@ -79,6 +79,7 @@ public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) { this.hashFunc = hashFunc; this.bundlesCache = Caffeine.newBuilder() + .executor(pulsar.getExecutor()) .recordStats() .buildAsync(this::loadBundles); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index db1aecf3887b7..85573d8c4be5e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -23,6 +23,7 @@ import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ComparisonChain; +import com.google.common.util.concurrent.MoreExecutors; import io.netty.buffer.ByteBuf; import java.util.ArrayList; import java.util.Collections; @@ -223,6 +224,7 @@ static void findStartPointLoop(Position p, long start, long end, static AsyncLoadingCache createCache(LedgerHandle lh, long maxSize) { return Caffeine.newBuilder() + .executor(MoreExecutors.directExecutor()) .maximumSize(maxSize) .buildAsync((entryId, executor) -> readOneMessageId(lh, entryId)); } diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java index b1f0572547ca7..620397c2a1946 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java @@ -97,6 +97,7 @@ public MetadataCacheImpl(String cacheName, MetadataStore store, MetadataSerde cacheBuilder.expireAfterWrite(cacheConfig.getExpireAfterWriteMillis(), TimeUnit.MILLISECONDS); } this.objCache = cacheBuilder + .executor(executor) .recordStats() .buildAsync(new AsyncCacheLoader>>() { @Override diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java index b0e4b43f70067..7372d11f09363 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java @@ -77,6 +77,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co private final CopyOnWriteArrayList> sessionListeners = new CopyOnWriteArrayList<>(); protected final String metadataStoreName; protected final ScheduledExecutorService executor; + protected final ScheduledExecutorService cacheExecutor; private final AsyncLoadingCache> childrenCache; private final AsyncLoadingCache existsCache; private final CopyOnWriteArrayList> metadataCaches = new CopyOnWriteArrayList<>(); @@ -100,9 +101,12 @@ protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry openTele this.executor = new ScheduledThreadPoolExecutor(1, new DefaultThreadFactory( StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName())); + this.cacheExecutor = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), + new DefaultThreadFactory("metadata-cache-loader")); registerListener(this); this.childrenCache = Caffeine.newBuilder() + .executor(cacheExecutor) .recordStats() .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS) .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2, TimeUnit.MILLISECONDS) @@ -126,6 +130,7 @@ public CompletableFuture> asyncReload(String key, List oldV CacheMetricsCollector.CAFFEINE.addCache(metadataStoreName + "-children", childrenCache); this.existsCache = Caffeine.newBuilder() + .executor(cacheExecutor) .recordStats() .refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS) .expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2, TimeUnit.MILLISECONDS) @@ -249,7 +254,7 @@ public MetadataCache getMetadataCache(Class clazz, MetadataCacheConfig JavaType typeRef = TypeFactory.defaultInstance().constructSimpleType(clazz, null); String cacheName = StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName(); MetadataCacheImpl metadataCache = - new MetadataCacheImpl(cacheName, this, typeRef, cacheConfig, this.executor); + new MetadataCacheImpl(cacheName, this, typeRef, cacheConfig, this.cacheExecutor); metadataCaches.add(metadataCache); return metadataCache; } @@ -258,7 +263,7 @@ public MetadataCache getMetadataCache(Class clazz, MetadataCacheConfig public MetadataCache getMetadataCache(TypeReference typeRef, MetadataCacheConfig cacheConfig) { String cacheName = StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName(); MetadataCacheImpl metadataCache = - new MetadataCacheImpl(cacheName, this, typeRef, cacheConfig, this.executor); + new MetadataCacheImpl(cacheName, this, typeRef, cacheConfig, this.cacheExecutor); metadataCaches.add(metadataCache); return metadataCache; } @@ -267,8 +272,7 @@ public MetadataCache getMetadataCache(TypeReference typeRef, MetadataC public MetadataCache getMetadataCache(String cacheName, MetadataSerde serde, MetadataCacheConfig cacheConfig) { MetadataCacheImpl metadataCache = - new MetadataCacheImpl<>(cacheName, this, serde, cacheConfig, - this.executor); + new MetadataCacheImpl<>(cacheName, this, serde, cacheConfig, this.cacheExecutor); metadataCaches.add(metadataCache); return metadataCache; } @@ -556,6 +560,8 @@ protected static CompletableFuture alreadyClosedFailedFuture() { @Override public void close() throws Exception { + cacheExecutor.shutdownNow(); + cacheExecutor.awaitTermination(10, TimeUnit.SECONDS); executor.shutdownNow(); executor.awaitTermination(10, TimeUnit.SECONDS); this.metadataStoreStats.close();