[SPARK-56302][CORE] Free task result memory eagerly during serialization on executor#55110
[SPARK-56302][CORE] Free task result memory eagerly during serialization on executor#55110ivoson wants to merge 6 commits intoapache:masterfrom
Conversation
f34ec4d to
822f147
Compare
|
cc @LuciferYang @Ngone51 can you please take a look? Thx |
|
@ivoson Which versions does this fix need to be backported to? |
4.1 should be fine if possible. |
What do you mean by "the next is produced"? BTW, aren't those 3 fields are local variables? For a local variable, it shouldn't have much difference to null it out earlier or not. Or, is there obvious delay before the |
We'll serialize the result multiple times,
All the serialization steps happen after cc @Ngone51 |
.github/workflows/build_and_test.yml
Outdated
| - name: Install Python packages (Python 3.12) | ||
| run: | | ||
| python3.12 -m pip install 'numpy>=1.22' pyarrow 'pandas==2.3.3' pyyaml scipy unittest-xml-reporting 'lxml==4.9.4' 'grpcio==1.76.0' 'grpcio-status==1.76.0' 'protobuf==6.33.5' 'zstandard==0.25.0' | ||
| python3.12 -m pip list |
There was a problem hiding this comment.
AI generated for test, will revert...
I think we might not need to install these packages, just install python 3.12 should be enough.
.github/workflows/build_and_test.yml
Outdated
| run: | | ||
| sudo apt update | ||
| sudo apt-get install r-base | ||
| - name: Install Python 3.12 |
There was a problem hiding this comment.
Is there something wrong with the current K8s test? We should fix it in a separate PR.
There was a problem hiding this comment.
Sounds good, will revert this change after the fix verified.
In previous CI jobs, seems like the k8s-integration-test ran failed due to some python syntax error while submit some CI test:
Traceback (most recent call last):
File "/opt/spark/tests/decommissioning.py", line 21, in <module>
from pyspark.sql import SparkSession
File "/opt/spark/python/lib/pyspark.zip/pyspark/__init__.py", line 56, in <module>
File "/opt/spark/python/lib/pyspark.zip/pyspark/core/rdd.py", line 73, in <module>
File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
File "<frozen importlib._bootstrap>", line 1002, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 945, in _find_spec
File "<frozen importlib._bootstrap_external>", line 1439, in find_spec
File "<frozen importlib._bootstrap_external>", line 1411, in _get_spec
File "<frozen zipimport>", line 170, in find_spec
File "<frozen importlib._bootstrap>", line 431, in spec_from_loader
File "<frozen importlib._bootstrap_external>", line 741, in spec_from_file_location
File "<frozen zipimport>", line 229, in get_filename
File "<frozen zipimport>", line 767, in _get_module_code
File "<frozen zipimport>", line 696, in _compile_source
File "/opt/spark/python/lib/pyspark.zip/pyspark/rddsampler.py", line 104
class RDDStratifiedSampler[K: Hashable](RDDSamplerBase):
^
SyntaxError: invalid syntax
01:47:02.570 DEBUG org.apache.spark.util.ShutdownHookManager: Shutdown hook called
There was a problem hiding this comment.
https://github.com/ivoson/spark/actions/runs/23891768548/job/69666818706
The CI job passed after explicitly install python 3.12
What changes were proposed in this pull request?
Eagerly null intermediate objects during task result serialization in
Executorto reduce peak heap memory usage.During result serialization in
TaskRunner.run(), three representations of the result coexist on the heap simultaneously:value— the raw task result object fromtask.run()valueByteBuffer— first serialization of the resultserializedDirectResult— second serialization wrapping the above into aDirectTaskResultEach becomes dead as soon as the next is produced, but none were released.
This PR nulls each reference as soon as it's no longer needed:
value = nullafter serializing intovalueByteBuffervalueByteBuffer = nullanddirectResult = nullafter re-serializing intoserializedDirectResultAll changes are confined to the executor side within
TaskRunner.run(), where the variables are local and not exposed to other components.Why are the changes needed?
For tasks returning large results (e.g.
collect()on large datasets), the redundant copies can roughly triple peak memory during serialization, increasing GC pressure or causing executor OOM. Eagerly freeing dead references lets the GC reclaim memory sooner.Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing UTs
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code v2.1.88