Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.kubernetes.client.util.Config;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.File;
import java.io.IOException;
import java.net.SocketAddress;
Expand All @@ -50,6 +51,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import okhttp3.OkHttpClient;
Expand Down Expand Up @@ -105,6 +108,8 @@ public class AuthenticationProviderOpenID implements AuthenticationProvider {

private volatile AsyncHttpClient httpClient;

private ExecutorService cacheExecutor;

// A list of supported algorithms. This is the "alg" field on the JWT.
// Source for strings: https://datatracker.ietf.org/doc/html/rfc7518#section-3.1.
private static final String ALG_RS256 = "RS256";
Expand Down Expand Up @@ -188,10 +193,13 @@ public void initialize(Context context) throws IOException {
.setReadTimeout(readTimeout)
.setSslContext(sslContext)
.build();
cacheExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory("authentication-cache-loader"));
httpClient = new DefaultAsyncHttpClient(clientConfig);
k8sApiClient = fallbackDiscoveryMode != FallbackDiscoveryMode.DISABLED ? Config.defaultClient() : null;
this.openIDProviderMetadataCache = new OpenIDProviderMetadataCache(this, config, httpClient, k8sApiClient);
this.jwksCache = new JwksCache(this, config, httpClient, k8sApiClient);
this.openIDProviderMetadataCache =
new OpenIDProviderMetadataCache(this, config, httpClient, k8sApiClient, cacheExecutor);
this.jwksCache = new JwksCache(this, config, httpClient, k8sApiClient, cacheExecutor);
}

@Override
Expand Down Expand Up @@ -380,6 +388,9 @@ public void close() throws IOException {
if (httpClient != null) {
httpClient.close();
}
if (cacheExecutor != null) {
cacheExecutor.shutdown();
}
if (k8sApiClient != null) {
OkHttpClient okHttpClient = k8sApiClient.getHttpClient();
okHttpClient.dispatcher().executorService().shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -65,7 +66,7 @@ public class JwksCache {
private final AuthenticationProvider authenticationProvider;

JwksCache(AuthenticationProvider authenticationProvider, ServiceConfiguration config,
AsyncHttpClient httpClient, ApiClient apiClient) throws IOException {
AsyncHttpClient httpClient, ApiClient apiClient, ExecutorService cacheExecutor) throws IOException {
this.authenticationProvider = authenticationProvider;
// Store the clients
this.httpClient = httpClient;
Expand All @@ -88,6 +89,7 @@ public class JwksCache {
}
};
this.cache = Caffeine.newBuilder()
.executor(cacheExecutor)
.recordStats()
.maximumSize(maxSize)
.refreshAfterWrite(refreshAfterWriteSeconds, TimeUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -61,7 +62,7 @@ class OpenIDProviderMetadataCache {
private static final String SLASH_WELL_KNOWN_OPENID_CONFIG = "/" + WELL_KNOWN_OPENID_CONFIG;

OpenIDProviderMetadataCache(AuthenticationProvider authenticationProvider, ServiceConfiguration config,
AsyncHttpClient httpClient, ApiClient apiClient) {
AsyncHttpClient httpClient, ApiClient apiClient, ExecutorService cacheExecutor) {
this.authenticationProvider = authenticationProvider;
int maxSize = getConfigValueAsInt(config, CACHE_SIZE, CACHE_SIZE_DEFAULT);
int refreshAfterWriteSeconds = getConfigValueAsInt(config, CACHE_REFRESH_AFTER_WRITE_SECONDS,
Expand All @@ -78,6 +79,7 @@ class OpenIDProviderMetadataCache {
}
};
this.cache = Caffeine.newBuilder()
.executor(cacheExecutor)
.recordStats()
.maximumSize(maxSize)
.refreshAfterWrite(refreshAfterWriteSeconds, TimeUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.github.benmanes.caffeine.cache.AsyncCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
Expand Down Expand Up @@ -56,8 +57,11 @@ public class ManagedLedgerClientFactory implements ManagedLedgerStorage {
private BookkeeperManagedLedgerStorageClass defaultStorageClass;
private ManagedLedgerFactory managedLedgerFactory;
private BookKeeper defaultBkClient;
private final AsyncCache<EnsemblePlacementPolicyConfig, BookKeeper>
bkEnsemblePolicyToBkClientMap = Caffeine.newBuilder().recordStats().buildAsync();
private final AsyncCache<EnsemblePlacementPolicyConfig, BookKeeper> bkEnsemblePolicyToBkClientMap =
Caffeine.newBuilder()
.executor(MoreExecutors.directExecutor())
.recordStats()
.buildAsync();
private StatsProvider statsProvider = new NullStatsProvider();

public ManagedLedgerClientFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public NamespaceBundleFactory(PulsarService pulsar, HashFunction hashFunc) {
this.hashFunc = hashFunc;

this.bundlesCache = Caffeine.newBuilder()
.executor(pulsar.getExecutor())
.recordStats()
.buildAsync(this::loadBundles);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -223,6 +224,7 @@ static void findStartPointLoop(Position p, long start, long end,
static AsyncLoadingCache<Long, MessageIdData> createCache(LedgerHandle lh,
long maxSize) {
return Caffeine.newBuilder()
.executor(MoreExecutors.directExecutor())
.maximumSize(maxSize)
.buildAsync((entryId, executor) -> readOneMessageId(lh, entryId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ public MetadataCacheImpl(String cacheName, MetadataStore store, MetadataSerde<T>
cacheBuilder.expireAfterWrite(cacheConfig.getExpireAfterWriteMillis(), TimeUnit.MILLISECONDS);
}
this.objCache = cacheBuilder
.executor(executor)
.recordStats()
.buildAsync(new AsyncCacheLoader<String, Optional<CacheGetResult<T>>>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
private final CopyOnWriteArrayList<Consumer<SessionEvent>> sessionListeners = new CopyOnWriteArrayList<>();
protected final String metadataStoreName;
protected final ScheduledExecutorService executor;
protected final ScheduledExecutorService cacheExecutor;
private final AsyncLoadingCache<String, List<String>> childrenCache;
private final AsyncLoadingCache<String, Boolean> existsCache;
private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = new CopyOnWriteArrayList<>();
Expand All @@ -100,9 +101,12 @@ protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry openTele
this.executor = new ScheduledThreadPoolExecutor(1,
new DefaultThreadFactory(
StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName()));
this.cacheExecutor = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory("metadata-cache-loader"));
registerListener(this);

this.childrenCache = Caffeine.newBuilder()
.executor(cacheExecutor)
.recordStats()
.refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS)
.expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2, TimeUnit.MILLISECONDS)
Expand All @@ -126,6 +130,7 @@ public CompletableFuture<List<String>> asyncReload(String key, List<String> oldV
CacheMetricsCollector.CAFFEINE.addCache(metadataStoreName + "-children", childrenCache);

this.existsCache = Caffeine.newBuilder()
.executor(cacheExecutor)
.recordStats()
.refreshAfterWrite(CACHE_REFRESH_TIME_MILLIS, TimeUnit.MILLISECONDS)
.expireAfterWrite(CACHE_REFRESH_TIME_MILLIS * 2, TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -249,7 +254,7 @@ public <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig
JavaType typeRef = TypeFactory.defaultInstance().constructSimpleType(clazz, null);
String cacheName = StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName();
MetadataCacheImpl<T> metadataCache =
new MetadataCacheImpl<T>(cacheName, this, typeRef, cacheConfig, this.executor);
new MetadataCacheImpl<T>(cacheName, this, typeRef, cacheConfig, this.cacheExecutor);
metadataCaches.add(metadataCache);
return metadataCache;
}
Expand All @@ -258,7 +263,7 @@ public <T> MetadataCache<T> getMetadataCache(Class<T> clazz, MetadataCacheConfig
public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataCacheConfig cacheConfig) {
String cacheName = StringUtils.isNotBlank(metadataStoreName) ? metadataStoreName : getClass().getSimpleName();
MetadataCacheImpl<T> metadataCache =
new MetadataCacheImpl<T>(cacheName, this, typeRef, cacheConfig, this.executor);
new MetadataCacheImpl<T>(cacheName, this, typeRef, cacheConfig, this.cacheExecutor);
metadataCaches.add(metadataCache);
return metadataCache;
}
Expand All @@ -267,8 +272,7 @@ public <T> MetadataCache<T> getMetadataCache(TypeReference<T> typeRef, MetadataC
public <T> MetadataCache<T> getMetadataCache(String cacheName, MetadataSerde<T> serde,
MetadataCacheConfig cacheConfig) {
MetadataCacheImpl<T> metadataCache =
new MetadataCacheImpl<>(cacheName, this, serde, cacheConfig,
this.executor);
new MetadataCacheImpl<>(cacheName, this, serde, cacheConfig, this.cacheExecutor);
metadataCaches.add(metadataCache);
return metadataCache;
}
Expand Down Expand Up @@ -556,6 +560,8 @@ protected static <T> CompletableFuture<T> alreadyClosedFailedFuture() {

@Override
public void close() throws Exception {
cacheExecutor.shutdownNow();
cacheExecutor.awaitTermination(10, TimeUnit.SECONDS);
executor.shutdownNow();
executor.awaitTermination(10, TimeUnit.SECONDS);
this.metadataStoreStats.close();
Expand Down
Loading