Skip to content

[SPARK-55278] Introduce module and core abstraction for language-agnostic UDF worker.#55089

Open
haiyangsun-db wants to merge 6 commits intoapache:masterfrom
haiyangsun-db:SPARK-55278
Open

[SPARK-55278] Introduce module and core abstraction for language-agnostic UDF worker.#55089
haiyangsun-db wants to merge 6 commits intoapache:masterfrom
haiyangsun-db:SPARK-55278

Conversation

@haiyangsun-db
Copy link
Copy Markdown
Contributor

@haiyangsun-db haiyangsun-db commented Mar 30, 2026

What changes were proposed in this pull request?

This PR introduces the foundational package structure and core abstractions for the language-agnostic UDF worker framework described in SPIP SPARK-55278.

The new udf/worker module contains two sub-modules:

  • proto/ — Protobuf definition of UDFWorkerSpecification (currently a placeholder; full schema to follow), plus a typed Scala wrapper:

    • WorkerSpecification — Scala wrapper around the protobuf spec.
  • core/ — Engine-side APIs (all @Experimental):

    • WorkerDispatcher — manages workers for a given spec; creates sessions. Handles pooling, reuse, and lifecycle behind the scenes. Extends AutoCloseable.
    • WorkerSession — represents one single UDF execution. Not 1-to-1 with a worker process; multiple sessions may share the same underlying worker. Extends AutoCloseable with a default no-op close() so callers can use try-with-resources
      from the start.
    • WorkerSecurityScope — identifies a security boundary for worker connection pooling. Requires subclasses to implement equals/hashCode so that structurally equivalent scopes enable worker reuse.

Build integration:

  • Maven and SBT build definitions for both sub-modules.
  • project/SparkBuild.scala updated to register the new modules and configure unidoc exclusions (JavaUnidoc only — Scala API docs are included).

Why are the changes needed?

This is the first step toward a language-agnostic UDF protocol for Spark that enables UDF workers written in any language to communicate with the Spark engine through a well-defined specification and API boundary. The abstractions introduced here establish the core contract that concrete implementations (e.g., process-based or gRPC-based workers) will build on.

Why introduce a separate root-level module:

  1. The worker specification module is not specific to Spark Connect—it should also support PySpark workers in the classic (non-Connect) mode.
  2. The module has minimal dependency on Spark internals or the SQL engine, making it a poor fit for existing core or sql modules.
  3. Keeping it as a separate module helps maintain a clear focus on worker abstractions and improves modularity.

Does this PR introduce any user-facing change?

No. All new APIs are marked @Experimental and there are no behavioral changes to existing code.

How was this patch tested?

  • Compilation verified via both Maven and SBT.
  • WorkerAbstractionSuite provides a basic test placeholder.
  • Scaladoc generation verified via build/sbt unidoc (ScalaUnidoc succeeds; JavaUnidoc excludes udf-worker modules, consistent with how connectCommon/connect/protobuf modules are handled).

Was this patch authored or co-authored using generative AI tooling?

Yes.

Copy link
Copy Markdown
Contributor

@dtenedor dtenedor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM as a start. It conforms to the SPIP as documented so far.

We can leave it open for some time for the community to take a look and possibly comment.

Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary

Prior state and problem: Spark has no language-agnostic UDF protocol. Python UDFs use an in-process pipe model (BasePythonRunner), which doesn't generalize to other languages with separate worker processes.

Design approach: This PR establishes a new udf/worker module split into proto/ (protobuf wire format + Scala wrapper) and core/ (engine-side abstractions). The design follows a Dispatcher → Session pattern: WorkerDispatcher manages worker lifecycle and pooling, WorkerSession represents a single UDF execution, and WorkerSecurityScope partitions the worker pool by security boundary.

Key design decisions:

  • WorkerDispatcher is a trait extending AutoCloseable, making implementations responsible for resource cleanup
  • WorkerSession and WorkerSecurityScope are abstract classes with no methods — placeholders for concrete implementations in follow-up PRs
  • The proto module overrides protobuf-java scope from provided (root POM) to compile so generated classes are available on the compile classpath

Implementation sketch: The module is registered in the Maven reactor (pom.xml), SBT build (SparkBuild.scala — project refs, MiMa exclusions, protobuf codegen), and CI (modules.py). All new APIs are @Experimental.

General comments

  • The PR description says WorkerSpec is in core/, but the typed wrapper is actually WorkerSpecification in proto/. The README has the correct information — the PR description should be updated to match.

* implementation based on the [[WorkerSpecification]].
*/
@Experimental
abstract class WorkerSession
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WorkerSession does not extend AutoCloseable, while WorkerDispatcher does. Since a session "can carry per-execution state" and implementations "may add lifecycle hooks," callers have no standard way to release per-session resources. If AutoCloseable is added later, all callers must be updated. Consider abstract class WorkerSession extends AutoCloseable from the start, even if concrete implementations initially no-op on close().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, added AutoClosable.

* Workers are only reused within the same security scope.
*/
@Experimental
abstract class WorkerSecurityScope
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Scaladoc says "Workers are only reused within the same security scope," which implies dispatcher implementations will compare scopes for equality. The default Object.equals (reference equality) means structurally equivalent scopes won't match, silently preventing worker reuse. Consider documenting that subclasses must override equals/hashCode, or making this a sealed trait with concrete implementations that enforce it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, making equals/hardCode mandatory

@haiyangsun-db haiyangsun-db changed the title [SPARK-55278] Introduce core abstraction for UDF worker. [SPARK-55278] Introduce module and core abstraction for language-agnostic UDF worker. Apr 1, 2026
@haiyangsun-db
Copy link
Copy Markdown
Contributor Author

Hi @holdenk , this is the first PR for the language-agnostic UDF work. The main goal of this PR is to set up the module and few key worker abstractions. More detailed pieces will follow in subsequent PRs. Please feel free to take a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants