From dd310b2ca231d99d3b0abbb91c1ea6a9a045cca6 Mon Sep 17 00:00:00 2001 From: Haiyang Sun Date: Mon, 30 Mar 2026 07:04:38 +0000 Subject: [PATCH 1/6] Introduce core abstraction for the worker. --- udf/worker/README.md | 47 ++++++++++++ udf/worker/core/pom.xml | 67 +++++++++++++++++ .../udf/worker/core/WorkerDispatcher.scala | 46 ++++++++++++ .../udf/worker/core/WorkerSecurityScope.scala | 28 +++++++ .../spark/udf/worker/core/WorkerSession.scala | 35 +++++++++ .../spark/udf/worker/core/WorkerSpec.scala | 28 +++++++ udf/worker/pom.xml | 40 ++++++++++ udf/worker/proto/pom.xml | 73 +++++++++++++++++++ .../proto/src/main/protobuf/worker_spec.proto | 28 +++++++ 9 files changed, 392 insertions(+) create mode 100644 udf/worker/README.md create mode 100644 udf/worker/core/pom.xml create mode 100644 udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala create mode 100644 udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSecurityScope.scala create mode 100644 udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala create mode 100644 udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSpec.scala create mode 100644 udf/worker/pom.xml create mode 100644 udf/worker/proto/pom.xml create mode 100644 udf/worker/proto/src/main/protobuf/worker_spec.proto diff --git a/udf/worker/README.md b/udf/worker/README.md new file mode 100644 index 0000000000000..242fd7e6ef374 --- /dev/null +++ b/udf/worker/README.md @@ -0,0 +1,47 @@ +# udf/worker -- Language-agnostic UDF Worker Framework + +Package structure for the UDF worker framework described in +[SPIP SPARK-55278](https://issues.apache.org/jira/browse/SPARK-55278). + +## Overview + +Spark processes a UDF by first obtaining a **WorkerDispatcher** from the worker +specification (plus context such as security scope). The dispatcher manages the +actual worker processes behind the scenes -- pooling, reuse, and termination are +all invisible to Spark. + +From the dispatcher, Spark gets a **WorkerSession**, which represents one single +UDF execution and can carry per-execution state. A WorkerSession is not 1-to-1 +mapped to an actual worker -- multiple sessions may share the same underlying +worker when it is reused. Worker reuse is managed by each dispatcher +implementation based on the worker specification. + +## Sub-packages + +``` +udf/worker/ +├── proto/ Protobuf definition of the worker specification +│ (UDFWorkerSpecification -- currently a placeholder). +└── core/ Engine-side APIs (all @Experimental): + WorkerSpec -- typed Scala wrapper around the protobuf spec. + WorkerDispatcher -- manages workers for one spec; creates sessions. + WorkerSession -- represents one single UDF execution. + WorkerSecurityScope -- security boundary for connection pooling. +``` + +## Build + +SBT: +``` +build/sbt "project udf-worker-core" compile +``` + +Maven: +``` +mvn -pl udf/worker -am compile +``` + +## Design references + +* [SPIP Language-agnostic UDF Protocol for Spark](https://docs.google.com/document/d/19Whzq127QxVt2Luk0EClgaDtcpBsFUp67NcVdKKyPF8/edit?tab=t.0) +* [SPIP Language-agnostic UDF Protocol for Spark -- Worker Specification](https://docs.google.com/document/d/1Dx9NqHRNuUpatH9DYoFF9cmvUl2fqHT4Rjbyw4EGLHs/edit?tab=t.0#heading=h.4h01j4b8rjzv) diff --git a/udf/worker/core/pom.xml b/udf/worker/core/pom.xml new file mode 100644 index 0000000000000..369789ab60cba --- /dev/null +++ b/udf/worker/core/pom.xml @@ -0,0 +1,67 @@ + + + + + 4.0.0 + + org.apache.spark + spark-udf-worker-parent_2.13 + 4.2.0-SNAPSHOT + ../pom.xml + + + spark-udf-worker-core_2.13 + jar + Spark Project UDF Worker Core + https://spark.apache.org/ + + + udf-worker-core + + + + + org.apache.spark + spark-tags_${scala.binary.version} + + + org.apache.spark + spark-udf-worker-proto_${scala.binary.version} + ${project.version} + + + org.scala-lang + scala-library + ${scala.version} + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + net.alchim31.maven + scala-maven-plugin + + + + diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala new file mode 100644 index 0000000000000..eeee9ea8e3f50 --- /dev/null +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.udf.worker.core + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Manages workers for a single [[WorkerSpec]] and hides worker details from Spark. + * + * A [[WorkerDispatcher]] is created from a worker specification (plus context such + * as security scope). It owns the underlying worker processes and connections, + * handling pooling, reuse, and lifecycle behind the scenes. Spark interacts with + * workers exclusively through the [[WorkerSession]]s returned by [[createSession]]. + */ +@Experimental +trait WorkerDispatcher extends AutoCloseable { + + def workerSpec: WorkerSpec + + /** + * Creates a [[WorkerSession]] that maps to one single UDF execution. + * + * @param securityScope identifies which pool of workers may be reused for this + * session. Dispatcher implementations use the scope to + * decide whether an existing worker can be shared or a new + * one must be created. + */ + def createSession(securityScope: Option[WorkerSecurityScope]): WorkerSession + + override def close(): Unit +} diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSecurityScope.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSecurityScope.scala new file mode 100644 index 0000000000000..c7f998bb939da --- /dev/null +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSecurityScope.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.udf.worker.core + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Identifies a security boundary for worker connection pooling. + * + * Workers are only reused within the same security scope. + */ +@Experimental +abstract class WorkerSecurityScope diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala new file mode 100644 index 0000000000000..61b26d1319600 --- /dev/null +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.udf.worker.core + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Represents one single UDF execution. + * + * A [[WorkerSession]] is obtained from [[WorkerDispatcher#createSession]] and + * can carry per-execution state for that UDF invocation. Implementations may + * add concrete data-processing methods and lifecycle hooks as needed. + * + * A WorkerSession is not 1-to-1 mapped to an actual worker process. Multiple + * WorkerSessions may be backed by the same worker when the worker is reused. + * Worker reuse and pooling are managed by each [[WorkerDispatcher]] + * implementation based on the [[WorkerSpec]]. + */ +@Experimental +abstract class WorkerSession diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSpec.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSpec.scala new file mode 100644 index 0000000000000..13443864b5277 --- /dev/null +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSpec.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.udf.worker.core + +import org.apache.spark.annotation.Experimental +import org.apache.spark.udf.worker.worker_spec.UDFWorkerSpecification + +/** + * :: Experimental :: + * Typed Scala wrapper around the protobuf [[UDFWorkerSpecification]]. + */ +@Experimental +class WorkerSpec(val proto: UDFWorkerSpecification) { +} diff --git a/udf/worker/pom.xml b/udf/worker/pom.xml new file mode 100644 index 0000000000000..8ed1eb806f05d --- /dev/null +++ b/udf/worker/pom.xml @@ -0,0 +1,40 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent_2.13 + 4.2.0-SNAPSHOT + ../../pom.xml + + + spark-udf-worker-parent_2.13 + pom + Spark Project UDF Worker (Parent) + https://spark.apache.org/ + + + proto + core + + diff --git a/udf/worker/proto/pom.xml b/udf/worker/proto/pom.xml new file mode 100644 index 0000000000000..2730892b1fdc6 --- /dev/null +++ b/udf/worker/proto/pom.xml @@ -0,0 +1,73 @@ + + + + + 4.0.0 + + org.apache.spark + spark-udf-worker-parent_2.13 + 4.2.0-SNAPSHOT + ../pom.xml + + + spark-udf-worker-proto_2.13 + jar + Spark Project UDF Worker Proto + https://spark.apache.org/ + + + udf-worker-proto + + + + + com.google.protobuf + protobuf-java + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + com.github.os72 + protoc-jar-maven-plugin + ${protoc-jar-maven-plugin.version} + + + generate-sources + + run + + + com.google.protobuf:protoc:${protobuf.version} + ${protobuf.version} + + src/main/protobuf + + + + + + + + diff --git a/udf/worker/proto/src/main/protobuf/worker_spec.proto b/udf/worker/proto/src/main/protobuf/worker_spec.proto new file mode 100644 index 0000000000000..92dfccd6ef108 --- /dev/null +++ b/udf/worker/proto/src/main/protobuf/worker_spec.proto @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +package org.apache.spark.udf.worker; + +option java_package = "org.apache.spark.udf.worker"; +option java_multiple_files = true; + +// Placeholder -- full worker specification schema to be added. +// See design doc: SPIP SPARK-55278. +message UDFWorkerSpecification { +} From 0aa577e5a2d049f5f5c7ad90ee5965991b5dcf19 Mon Sep 17 00:00:00 2001 From: Haiyang Sun Date: Tue, 31 Mar 2026 19:32:58 +0000 Subject: [PATCH 2/6] fix sbt/maven build. add dummy test suite. --- dev/sparktestsupport/modules.py | 11 +++++++ pom.xml | 1 + project/SparkBuild.scala | 32 +++++++++++++++++-- udf/worker/README.md | 6 ++-- udf/worker/core/pom.xml | 1 - .../spark/udf/worker/core/WorkerSpec.scala | 2 +- .../worker/core/WorkerAbstractionSuite.scala | 25 +++++++++++++++ udf/worker/proto/pom.xml | 1 + 8 files changed, 73 insertions(+), 6 deletions(-) create mode 100644 udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/WorkerAbstractionSuite.scala diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 4fd8080c6d62b..df8c995648e69 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -222,6 +222,17 @@ def __hash__(self): sbt_test_goals=["variant/test"], ) +udf_worker = Module( + name="udf-worker", + dependencies=[tags], + source_file_regexes=[ + "udf/worker/", + ], + sbt_test_goals=[ + "udf-worker-core/test", + ], +) + core = Module( name="core", dependencies=[kvstore, network_common, network_shuffle, unsafe, launcher, utils], diff --git a/pom.xml b/pom.xml index 7f2a321346cb9..69c363e488894 100644 --- a/pom.xml +++ b/pom.xml @@ -111,6 +111,7 @@ connector/kafka-0-10-sql connector/avro connector/protobuf + udf/worker diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index e2782b3b4b482..0a26af9add6bf 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -59,13 +59,17 @@ object BuildCommons { Seq("connect-common", "connect", "connect-client-jdbc", "connect-client-jvm", "connect-shims") .map(ProjectRef(buildLocation, _)) + val udfWorkerProjects@Seq(udfWorkerProto, udfWorkerCore) = + Seq("udf-worker-proto", "udf-worker-core").map(ProjectRef(buildLocation, _)) + val allProjects@Seq( core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe, tags, sketch, kvstore, commonUtils, commonUtilsJava, variant, pipelines, _* ) = Seq( "core", "graphx", "mllib", "mllib-local", "repl", "network-common", "network-shuffle", "launcher", "unsafe", "tags", "sketch", "kvstore", "common-utils", "common-utils-java", "variant", "pipelines" - ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ connectProjects + ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ connectProjects ++ + udfWorkerProjects val optionallyEnabledProjects@Seq(kubernetes, yarn, sparkGangliaLgpl, streamingKinesisAsl, profiler, @@ -405,7 +409,8 @@ object SparkBuild extends PomBuild { Seq( spark, hive, hiveThriftServer, repl, networkCommon, networkShuffle, networkYarn, unsafe, tags, tokenProviderKafka010, sqlKafka010, pipelines, connectCommon, connect, - connectJdbc, connectClient, variant, connectShims, profiler, commonUtilsJava + connectJdbc, connectClient, variant, connectShims, profiler, commonUtilsJava, + udfWorkerProto, udfWorkerCore ).contains(x) } @@ -458,6 +463,9 @@ object SparkBuild extends PomBuild { /* Protobuf settings */ enable(SparkProtobuf.settings)(protobuf) + /* UDF Worker Proto settings */ + enable(UDFWorkerProto.settings)(udfWorkerProto) + enable(DockerIntegrationTests.settings)(dockerIntegrationTests) enable(KubernetesIntegrationTests.settings)(kubernetesIntegrationTests) @@ -1077,6 +1085,26 @@ object SparkProtobuf { } } +object UDFWorkerProto { + import BuildCommons.protoVersion + lazy val settings = Seq( + PB.protocVersion := BuildCommons.protoVersion, + libraryDependencies += "com.google.protobuf" % "protobuf-java" % protoVersion % "protobuf", + (Compile / PB.targets) := Seq( + PB.gens.java -> target.value / "generated-sources" + ) + ) ++ { + val sparkProtocExecPath = sys.props.get("spark.protoc.executable.path") + if (sparkProtocExecPath.isDefined) { + Seq( + PB.protocExecutable := file(sparkProtocExecPath.get) + ) + } else { + Seq.empty + } + } +} + object Unsafe { lazy val settings = Seq() } diff --git a/udf/worker/README.md b/udf/worker/README.md index 242fd7e6ef374..16bd2262d2258 100644 --- a/udf/worker/README.md +++ b/udf/worker/README.md @@ -33,12 +33,14 @@ udf/worker/ SBT: ``` -build/sbt "project udf-worker-core" compile +build/sbt "udf-worker-core/compile" +build/sbt "udf-worker-core/test" ``` Maven: ``` -mvn -pl udf/worker -am compile +./build/mvn -pl udf/worker/proto,udf/worker/core -am compile +./build/mvn -pl udf/worker/proto,udf/worker/core -am test ``` ## Design references diff --git a/udf/worker/core/pom.xml b/udf/worker/core/pom.xml index 369789ab60cba..8a701975f0e53 100644 --- a/udf/worker/core/pom.xml +++ b/udf/worker/core/pom.xml @@ -50,7 +50,6 @@ org.scala-lang scala-library - ${scala.version} diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSpec.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSpec.scala index 13443864b5277..8c3436da40e78 100644 --- a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSpec.scala +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSpec.scala @@ -17,7 +17,7 @@ package org.apache.spark.udf.worker.core import org.apache.spark.annotation.Experimental -import org.apache.spark.udf.worker.worker_spec.UDFWorkerSpecification +import org.apache.spark.udf.worker.UDFWorkerSpecification /** * :: Experimental :: diff --git a/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/WorkerAbstractionSuite.scala b/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/WorkerAbstractionSuite.scala new file mode 100644 index 0000000000000..42f53af07424a --- /dev/null +++ b/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/WorkerAbstractionSuite.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.udf.worker.core + +import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite + +class WorkerAbstractionSuite + extends AnyFunSuite { // scalastyle:ignore funsuite + + test("dummy") {} +} diff --git a/udf/worker/proto/pom.xml b/udf/worker/proto/pom.xml index 2730892b1fdc6..ac346cfe56a41 100644 --- a/udf/worker/proto/pom.xml +++ b/udf/worker/proto/pom.xml @@ -41,6 +41,7 @@ com.google.protobuf protobuf-java + compile From fe9c5b6f6143cd1969a83b494286bf7d97030af4 Mon Sep 17 00:00:00 2001 From: Haiyang Sun Date: Tue, 31 Mar 2026 20:49:27 +0000 Subject: [PATCH 3/6] fix CI sbt issue. --- pom.xml | 3 ++- udf/worker/core/pom.xml | 4 ++-- udf/worker/pom.xml | 40 ---------------------------------------- udf/worker/proto/pom.xml | 13 +++++++++++-- 4 files changed, 15 insertions(+), 45 deletions(-) delete mode 100644 udf/worker/pom.xml diff --git a/pom.xml b/pom.xml index 69c363e488894..5745674b1531d 100644 --- a/pom.xml +++ b/pom.xml @@ -111,7 +111,8 @@ connector/kafka-0-10-sql connector/avro connector/protobuf - udf/worker + udf/worker/proto + udf/worker/core diff --git a/udf/worker/core/pom.xml b/udf/worker/core/pom.xml index 8a701975f0e53..69088d284365f 100644 --- a/udf/worker/core/pom.xml +++ b/udf/worker/core/pom.xml @@ -23,9 +23,9 @@ 4.0.0 org.apache.spark - spark-udf-worker-parent_2.13 + spark-parent_2.13 4.2.0-SNAPSHOT - ../pom.xml + ../../../pom.xml spark-udf-worker-core_2.13 diff --git a/udf/worker/pom.xml b/udf/worker/pom.xml deleted file mode 100644 index 8ed1eb806f05d..0000000000000 --- a/udf/worker/pom.xml +++ /dev/null @@ -1,40 +0,0 @@ - - - - - 4.0.0 - - org.apache.spark - spark-parent_2.13 - 4.2.0-SNAPSHOT - ../../pom.xml - - - spark-udf-worker-parent_2.13 - pom - Spark Project UDF Worker (Parent) - https://spark.apache.org/ - - - proto - core - - diff --git a/udf/worker/proto/pom.xml b/udf/worker/proto/pom.xml index ac346cfe56a41..954f46b6eefe8 100644 --- a/udf/worker/proto/pom.xml +++ b/udf/worker/proto/pom.xml @@ -23,9 +23,9 @@ 4.0.0 org.apache.spark - spark-udf-worker-parent_2.13 + spark-parent_2.13 4.2.0-SNAPSHOT - ../pom.xml + ../../../pom.xml spark-udf-worker-proto_2.13 @@ -69,6 +69,15 @@ + + + net.alchim31.maven + scala-maven-plugin + + true + true + + From 89d43688ca9ced8dd5d8196f0f528c4e9cf50dda Mon Sep 17 00:00:00 2001 From: Haiyang Sun Date: Wed, 1 Apr 2026 09:00:12 +0000 Subject: [PATCH 4/6] fix sbt issue. --- udf/worker/README.md | 2 +- .../spark/udf/worker/core/WorkerDispatcher.scala | 5 +++-- .../spark/udf/worker/core/WorkerSession.scala | 2 +- udf/worker/proto/pom.xml | 13 ++++++++----- .../spark/udf/worker/WorkerSpecification.scala} | 5 ++--- 5 files changed, 15 insertions(+), 12 deletions(-) rename udf/worker/{core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSpec.scala => proto/src/main/scala/org/apache/spark/udf/worker/WorkerSpecification.scala} (86%) diff --git a/udf/worker/README.md b/udf/worker/README.md index 16bd2262d2258..f13ce85d4dad1 100644 --- a/udf/worker/README.md +++ b/udf/worker/README.md @@ -22,8 +22,8 @@ implementation based on the worker specification. udf/worker/ ├── proto/ Protobuf definition of the worker specification │ (UDFWorkerSpecification -- currently a placeholder). +│ WorkerSpecification -- typed Scala wrapper around the protobuf spec. └── core/ Engine-side APIs (all @Experimental): - WorkerSpec -- typed Scala wrapper around the protobuf spec. WorkerDispatcher -- manages workers for one spec; creates sessions. WorkerSession -- represents one single UDF execution. WorkerSecurityScope -- security boundary for connection pooling. diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala index eeee9ea8e3f50..58fabbaea00df 100644 --- a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala @@ -17,10 +17,11 @@ package org.apache.spark.udf.worker.core import org.apache.spark.annotation.Experimental +import org.apache.spark.udf.worker.WorkerSpecification /** * :: Experimental :: - * Manages workers for a single [[WorkerSpec]] and hides worker details from Spark. + * Manages workers for a single [[WorkerSpecification]] and hides worker details from Spark. * * A [[WorkerDispatcher]] is created from a worker specification (plus context such * as security scope). It owns the underlying worker processes and connections, @@ -30,7 +31,7 @@ import org.apache.spark.annotation.Experimental @Experimental trait WorkerDispatcher extends AutoCloseable { - def workerSpec: WorkerSpec + def workerSpec: WorkerSpecification /** * Creates a [[WorkerSession]] that maps to one single UDF execution. diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala index 61b26d1319600..2dde5b1e2413a 100644 --- a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala @@ -29,7 +29,7 @@ import org.apache.spark.annotation.Experimental * A WorkerSession is not 1-to-1 mapped to an actual worker process. Multiple * WorkerSessions may be backed by the same worker when the worker is reused. * Worker reuse and pooling are managed by each [[WorkerDispatcher]] - * implementation based on the [[WorkerSpec]]. + * implementation based on the [[WorkerSpecification]]. */ @Experimental abstract class WorkerSession diff --git a/udf/worker/proto/pom.xml b/udf/worker/proto/pom.xml index 954f46b6eefe8..a91921122b8df 100644 --- a/udf/worker/proto/pom.xml +++ b/udf/worker/proto/pom.xml @@ -43,6 +43,14 @@ protobuf-java compile + + org.apache.spark + spark-tags_${scala.binary.version} + + + org.scala-lang + scala-library + @@ -69,14 +77,9 @@ - net.alchim31.maven scala-maven-plugin - - true - true - diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSpec.scala b/udf/worker/proto/src/main/scala/org/apache/spark/udf/worker/WorkerSpecification.scala similarity index 86% rename from udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSpec.scala rename to udf/worker/proto/src/main/scala/org/apache/spark/udf/worker/WorkerSpecification.scala index 8c3436da40e78..e25b99b69990c 100644 --- a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSpec.scala +++ b/udf/worker/proto/src/main/scala/org/apache/spark/udf/worker/WorkerSpecification.scala @@ -14,15 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.udf.worker.core +package org.apache.spark.udf.worker import org.apache.spark.annotation.Experimental -import org.apache.spark.udf.worker.UDFWorkerSpecification /** * :: Experimental :: * Typed Scala wrapper around the protobuf [[UDFWorkerSpecification]]. */ @Experimental -class WorkerSpec(val proto: UDFWorkerSpecification) { +class WorkerSpecification(val proto: UDFWorkerSpecification) { } From ccd67fcfa2efc9c1237c5426d75baa214c7ef773 Mon Sep 17 00:00:00 2001 From: Haiyang Sun Date: Wed, 1 Apr 2026 12:17:41 +0000 Subject: [PATCH 5/6] fix javadoc failure --- project/SparkBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 0a26af9add6bf..3d993e74f8846 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -1682,7 +1682,7 @@ object Unidoc { (JavaUnidoc / unidoc / unidocProjectFilter) := inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes, yarn, tags, streamingKafka010, sqlKafka010, connectCommon, connect, connectJdbc, - connectClient, connectShims, protobuf, profiler), + connectClient, connectShims, protobuf, profiler, udfWorkerProto, udfWorkerCore), ) } From a0329ad9a5a5a7901c7c091fe41c88eefd79c38c Mon Sep 17 00:00:00 2001 From: Haiyang Sun Date: Wed, 1 Apr 2026 12:20:21 +0000 Subject: [PATCH 6/6] address comments. --- .../spark/udf/worker/core/WorkerSecurityScope.scala | 11 +++++++++-- .../apache/spark/udf/worker/core/WorkerSession.scala | 4 +++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSecurityScope.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSecurityScope.scala index c7f998bb939da..dc19b0b9f867a 100644 --- a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSecurityScope.scala +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSecurityScope.scala @@ -22,7 +22,14 @@ import org.apache.spark.annotation.Experimental * :: Experimental :: * Identifies a security boundary for worker connection pooling. * - * Workers are only reused within the same security scope. + * Workers are only reused within the same security scope. Dispatcher + * implementations compare scopes using `equals` to decide whether an + * existing worker can be shared. Subclasses '''must''' override + * `equals` and `hashCode` so that structurally equivalent scopes + * match; otherwise, worker reuse will silently fail. */ @Experimental -abstract class WorkerSecurityScope +abstract class WorkerSecurityScope { + override def equals(obj: Any): Boolean + override def hashCode(): Int +} diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala index 2dde5b1e2413a..83c392a895b66 100644 --- a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala @@ -32,4 +32,6 @@ import org.apache.spark.annotation.Experimental * implementation based on the [[WorkerSpecification]]. */ @Experimental -abstract class WorkerSession +abstract class WorkerSession extends AutoCloseable { + override def close(): Unit = {} +}