Skip to content

Conversation

zhanghaou
Copy link
Contributor

Motivation

Same to #24383

Currently, the usage of Caffeine in pulsar code is inconsistent. Some asynchronous builds use the default ForkJoin common pool, while others specify a custom thread pool, and there are also synchronous builds.

The ForkJoin pool has a specific bug that can prevent tasks from being executed after being submitted to the thread pool. This issue has been fixed in Java 21.0.8, but it still exists in Java 17.
https://bugs.openjdk.org/browse/JDK-8330017
https://bugs.openjdk.org/browse/JDK-8351933

If the ForkJoin common pool gets blocked, it can prevent all dependent caches from fetching data.

Modifications

  1. When executor in pulsarService is available, use pulsarService executor
  2. Add new executor in AbstractMetadataStore.java, which use 4 corePoolSize(hard code, need set to be configured?)
  3. Use MoreExecutors.directExecutor() for other caffeines.

The choice of thread pool may not be the most appropriate. Please provide suggestions for improvement, thanks.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Sep 16, 2025
@lhotari
Copy link
Member

lhotari commented Sep 16, 2025

This issue has been fixed in Java 21.0.8, but it still exists in Java 17.
https://bugs.openjdk.org/browse/JDK-8330017
https://bugs.openjdk.org/browse/JDK-8351933

Java 17.0.17 will include the fix and the planned release date is October 21st: https://wiki.openjdk.org/display/JDKUpdates/JDK+17u

@lhotari
Copy link
Member

lhotari commented Sep 16, 2025

Currently, the usage of Caffeine in pulsar code is inconsistent. Some asynchronous builds use the default ForkJoin common pool, while others specify a custom thread pool, and there are also synchronous builds.

why is this a problem if there are different ways to use Caffeine?

The ForkJoin pool has a specific bug that can prevent tasks from being executed after being submitted to the thread pool. This issue has been fixed in Java 21.0.8, but it still exists in Java 17. https://bugs.openjdk.org/browse/JDK-8330017 https://bugs.openjdk.org/browse/JDK-8351933

Since the fix for Java 17 will be released in 17.0.17 on October 21st, it will soon be addressed.

If the ForkJoin common pool gets blocked, it can prevent all dependent caches from fetching data.

I understand that it's severe when that happens. How often have you observed this bug? Does it require that that JVM has been running for very long or is it just random when there's heavy load?

@zhanghaou
Copy link
Contributor Author

why is this a problem if there are different ways to use Caffeine?

It’s not necessarily a problem, but I’d like to highlight that certain parts of the code explicitly specify a thread pool, whereas other parts continue to use the default thread pool.

I understand that it's severe when that happens. How often have you observed this bug? Does it require that that JVM has been running for very long or is it just random when there's heavy load?

It occurs very rarely, only after running for a long time, and it doesn’t seem to be related to the load.

@lhotari
Copy link
Member

lhotari commented Sep 16, 2025

It occurs very rarely, only after running for a long time, and it doesn’t seem to be related to the load.

what is "long time" here? days? weeks?

@lhotari
Copy link
Member

lhotari commented Sep 16, 2025

It’s not necessarily a problem, but I’d like to highlight that certain parts of the code explicitly specify a thread pool, whereas other parts continue to use the default thread pool.

For blocking IO, ForkJoinPool is a reasonable solution to avoid blocking Netty IO threads or the Metadata store thread. VirtualThreads in Java 21 could be a better option eventually. Instead of removing ForkJoinPool usage blindly, there should be more analysis of each case so that we don't cause performance regressions. In Pulsar, the execution model isn't clean and more effort should be put in designing how thread pools and task execution should behave in general. This problem has existed for a long time so it's waiting for contributors. :)

@zhanghaou
Copy link
Contributor Author

what is "long time" here? days? weeks?

We’ve gone through both weeks and months.

Add more info:
When ForkJoin common pool stop executing tasks, use arthas can get info of common pool.
Then common pool is running, but have some submissions in queue.

@forkjoinpool[
SWIDTH=@integer[16],
SMASK=@integer[65535],
MAX_CAP=@integer[32767],
SQMASK=@integer[126],
UNSIGNALLED=@integer[-2147483648],
SS_SEQ=@integer[65536],
QLOCK=@integer[1],
OWNED=@integer[1],
FIFO=@integer[65536],
SHUTDOWN=@integer[262144],
TERMINATED=@integer[524288],
STOP=@integer[-2147483648],
QUIET=@integer[1073741824],
DORMANT=@integer[-1073741824],
INITIAL_QUEUE_CAPACITY=@integer[8192],
MAXIMUM_QUEUE_CAPACITY=@integer[67108864],
TOP_BOUND_SHIFT=@integer[10],
defaultForkJoinWorkerThreadFactory=@DefaultForkJoinWorkerThreadFactory[java.util.concurrent.ForkJoinPool$DefaultForkJoinWorkerThreadFactory@43f5e9a3],
modifyThreadPermission=@RuntimePermission[("java.lang.RuntimePermission" "modifyThread")],
common=@forkjoinpool[java.util.concurrent.ForkJoinPool@2253dae0[Running, parallelism = 3, size = 2, active = 32770, running = 0, steals = 181625, tasks = 0, submissions = 27]],
COMMON_PARALLELISM=@integer[3],
COMMON_MAX_SPARES=@integer[256],
poolNumberSequence=@integer[0],
DEFAULT_KEEPALIVE=@long[60000],
TIMEOUT_SLOP=@long[20],
DEFAULT_COMMON_MAX_SPARES=@integer[256],
SEED_INCREMENT=@integer[-1640531527],
SP_MASK=@long[4294967295],
UC_MASK=@long[-4294967296],
RC_SHIFT=@integer[48],
RC_UNIT=@long[281474976710656],
RC_MASK=@long[-281474976710656],
TC_SHIFT=@integer[32],
TC_UNIT=@long[4294967296],
TC_MASK=@long[281470681743360],
ADD_WORKER=@long[140737488355328],
stealCount=@long[181603],
keepAlive=@long[60000],
indexSeed=@integer[-1126400000],
bounds=@integer[16842750],
mode=@integer[3],
workQueues=@WorkQueue[][isEmpty=false;size=8],
workerNamePrefix=@string[ForkJoinPool.commonPool-worker-],
factory=@DefaultForkJoinWorkerThreadFactory[java.util.concurrent.ForkJoinPool$DefaultForkJoinWorkerThreadFactory@43f5e9a3],
ueh=null,
saturate=null,
ctl=@long[9223372035728539649],
CTL=@FieldInstanceReadWrite[java.lang.invoke.VarHandleLongs$FieldInstanceReadWrite@1732b0fd],
MODE=@FieldInstanceReadWrite[java.lang.invoke.VarHandleInts$FieldInstanceReadWrite@3b3414a8],
QA=@array[java.lang.invoke.VarHandleObjects$Array@221b627],
$assertionsDisabled=@boolean[true],
]

@zhanghaou
Copy link
Contributor Author

For blocking IO, ForkJoinPool is a reasonable solution to avoid blocking Netty IO threads or the Metadata store thread. VirtualThreads in Java 21 could be a better option eventually.

I agree that VirtualThreads is better. However, given the current situation, ForkJoin common pool is not very suitable.

Instead of removing ForkJoinPool usage blindly, there should be more analysis of each case so that we don't cause performance regressions.

#22589 this pr change executor from ForkJoinPool to pulsar service executor. It seems there’s no analysis regarding performance regression.

In Pulsar, the execution model isn't clean and more effort should be put in designing how thread pools and task execution should behave in general. This problem has existed for a long time so it's waiting for contributors. :)

Look forward to it.

Copy link
Member

@nodece nodece left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, it used ForkJoinPool, but switching to MoreExecutors.directExecutor() avoids unnecessary thread context switching. In general, specifying an executor is beneficial for clarity and control

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't support making this change, since it's blindly making changes across the code base. For example in JwksCache, it's necessary to have an actual executor to run tasks in background. The refreshAfterWrite will handle that. This PR breaks existing behavior.

@zhanghaou
Copy link
Contributor Author

I don't support making this change, since it's blindly making changes across the code base. For example in JwksCache, it's necessary to have an actual executor to run tasks in background. The refreshAfterWrite will handle that. This PR breaks existing behavior.

Added a new executor to address it. Please take a review again. Thanks.

BewareMyPower
BewareMyPower previously approved these changes Sep 19, 2025
@BewareMyPower BewareMyPower dismissed their stale review September 19, 2025 03:58

Since the fix for Java 17 will be released in 17.0.17 on October 21st, it will soon be addressed.

@nodece nodece requested a review from lhotari September 19, 2025 03:58
@BewareMyPower
Copy link
Contributor

Since the fix for Java 17 will be released in 17.0.17 on October 21st, it will soon be addressed.

I saw this comment just now, it should be fine to wait for the JDK patch, so I dismissed my approval.

Using a dedicated executor is reasonable. But the ThreadPoolExecutor based executor is not better than ForkJoinPool, which uses work-stealing and is leveraged by the implementation of virtual threads. A simple difference is that if a task takes too long to complete, the queued tasks in the same thread don't have a chance to execute.

ForkJoinPool is a better choice for independent tasks like those tasks to avoid blocking Netty IO threads or the Metadata store thread in Pulsar.

@zhanghaou
Copy link
Contributor Author

I saw this comment just now, it should be fine to wait for the JDK patch, so I dismissed my approval.

Waiting for the JDK upgrade and using the latest version can indeed solve this problem, but wouldn't it be better if we could solve this issue without relying on a JDK upgrade?

If the user doesn't know that they need to upgrade the JDK to a specified version, there will always be a risk of blocking. That's really scary!

Using a dedicated executor is reasonable.

That's the purpose of this PR.

But the ThreadPoolExecutor based executor is not better than ForkJoinPool, which uses work-stealing and is leveraged by the implementation of virtual threads. A simple difference is that if a task takes too long to complete, the queued tasks in the same thread don't have a chance to execute.

ForkJoinPool is a better choice for independent tasks like those tasks to avoid blocking Netty IO threads or the Metadata store thread in Pulsar.

Use ThreadPoolExecutor based executor is refer to the existing code #22589

@BewareMyPower

@BewareMyPower
Copy link
Contributor

Technically, there are many references in Pulsar that use xxxAsync without the executor argument. We have to pass a dedicated executor in all places. This will change the code base a lot, which is redundant once we upgraded to a newer JDK that includes that fix.

To prevent this use cases, we also have to disallow any CompletableFuture's Async method without specifying the executor argument. It's hard to add this limit for a rarely happened case which will be fixed soon.

It's a JDK standard API that should be widely used. Many projects should be affected rather than Pulsar itself. So I guess it's why @lhotari asked what is "long time" here? days? weeks? before.

@BewareMyPower
Copy link
Contributor

A simple question, if I'm going to change the dedicated thread pool to ForkJoinedPool in future, could it be blocked due to the bug that is fixed in recent JDK versions?

@zhanghaou
Copy link
Contributor Author

A simple question, if I'm going to change the dedicated thread pool to ForkJoinedPool in future, could it be blocked due to the bug that is fixed in recent JDK versions?

If use ForkJoinPool as the dedicated thread pool but not the common pool, even if it is blocked, the impact is localized; Unlike now when using the ForkJoin common pool, where the impact is global.

@zhanghaou
Copy link
Contributor Author

Many projects should be affected rather than Pulsar itself.

Hi, @BewareMyPower
Could you share which Apache project use the same ForkJoin common pool in various places?

@BewareMyPower
Copy link
Contributor

In principal, a Java project's code base should assume the JDK is reliable.

ForkJoinPool is widely used within JDK, not only for CompletableFuture:

  • Parallel stream (parallelStream()) uses ForkJoinPool to execute sub-tasks (see ForkJoinTask)
  • Virtual threads uses ForkJoinPool as its default scheduler (see VirtualThread#createDefaultScheduler)
  • (not sure if there are some other similar classes provided by JDK)
  • 3rd party Java infra projects like Caffine that use default ForkJoinPool as the default executor

Now, your question might become which Apache project uses parallel stream, virtual threads, Caffine cache (or any unknown dependency that uses the common ForkJoinPool as the default executor) without changing the default scheduler?

I don't have a tool to detect usages of ForkJoinPool in all open sourced or even only Apache projects, but this issue is beyond the scope of a Java project like Apache Pulsar. If you have suffered the bug in production and is eager to get a fix ASAP, you should:

  • Fork the project and push the "fix" internally (I believe you have also done that)
  • Build a JDK that includes the fix by yourself or turn to an existing JDK vendor

However, such a bug from JDK that rarely happens should not be a key factor that changes the code base a lot.

@BewareMyPower
Copy link
Contributor

In short, using a dedicated pool is okay. Continue using ForkJoinPool is okay as well. I'm not against using a different thread pool for Caffine cache only, so I didn't leave a comment or requested change to block merging this PR as well. But this bug of JDK should not be a reason to remove ForkJoinPool everywhere.

I'd like to hear @lhotari's voice again

@lhotari
Copy link
Member

lhotari commented Sep 19, 2025

In short, using a dedicated pool is okay. Continue using ForkJoinPool is okay as well. I'm not against using a different thread pool for Caffine cache only, so I didn't leave a comment or requested change to block merging this PR as well. But this bug of JDK should not be a reason to remove ForkJoinPool everywhere.

I'd like to hear @lhotari's voice again

@BewareMyPower I think your previous comment already covered most and was a really good answer.

Just adding a few items:

  • There's already a problem in Pulsar that existing executors aren't properly used and designed.
  • An ordinary executor behaves in a different way than the ForkJoinPool. ForkJoinPool can expand the number of threads when it detects blocking and handle this efficiently. This might be relevant in some of the current usage of executors. Switching to use an ordinary executor could cause deadlocks in certain cases where the existing code doesn't deadlock because of ForkJoinPool.
  • MoreExecutor.directExecutor shouldn't be used at all when refreshAfterWrite is used with Caffeine, since it would defeat the purpose of using background refreshing of the cached items. I don't see a reason why someone would want to make an async Caffeine cache synchronous. That doesn't make sense.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants