diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 4fd8080c6d62b..df8c995648e69 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -222,6 +222,17 @@ def __hash__(self): sbt_test_goals=["variant/test"], ) +udf_worker = Module( + name="udf-worker", + dependencies=[tags], + source_file_regexes=[ + "udf/worker/", + ], + sbt_test_goals=[ + "udf-worker-core/test", + ], +) + core = Module( name="core", dependencies=[kvstore, network_common, network_shuffle, unsafe, launcher, utils], diff --git a/pom.xml b/pom.xml index 7f2a321346cb9..5745674b1531d 100644 --- a/pom.xml +++ b/pom.xml @@ -111,6 +111,8 @@ connector/kafka-0-10-sql connector/avro connector/protobuf + udf/worker/proto + udf/worker/core diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index e2782b3b4b482..3d993e74f8846 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -59,13 +59,17 @@ object BuildCommons { Seq("connect-common", "connect", "connect-client-jdbc", "connect-client-jvm", "connect-shims") .map(ProjectRef(buildLocation, _)) + val udfWorkerProjects@Seq(udfWorkerProto, udfWorkerCore) = + Seq("udf-worker-proto", "udf-worker-core").map(ProjectRef(buildLocation, _)) + val allProjects@Seq( core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe, tags, sketch, kvstore, commonUtils, commonUtilsJava, variant, pipelines, _* ) = Seq( "core", "graphx", "mllib", "mllib-local", "repl", "network-common", "network-shuffle", "launcher", "unsafe", "tags", "sketch", "kvstore", "common-utils", "common-utils-java", "variant", "pipelines" - ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ connectProjects + ).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ connectProjects ++ + udfWorkerProjects val optionallyEnabledProjects@Seq(kubernetes, yarn, sparkGangliaLgpl, streamingKinesisAsl, profiler, @@ -405,7 +409,8 @@ object SparkBuild extends PomBuild { Seq( spark, hive, hiveThriftServer, repl, networkCommon, networkShuffle, networkYarn, unsafe, tags, tokenProviderKafka010, sqlKafka010, pipelines, connectCommon, connect, - connectJdbc, connectClient, variant, connectShims, profiler, commonUtilsJava + connectJdbc, connectClient, variant, connectShims, profiler, commonUtilsJava, + udfWorkerProto, udfWorkerCore ).contains(x) } @@ -458,6 +463,9 @@ object SparkBuild extends PomBuild { /* Protobuf settings */ enable(SparkProtobuf.settings)(protobuf) + /* UDF Worker Proto settings */ + enable(UDFWorkerProto.settings)(udfWorkerProto) + enable(DockerIntegrationTests.settings)(dockerIntegrationTests) enable(KubernetesIntegrationTests.settings)(kubernetesIntegrationTests) @@ -1077,6 +1085,26 @@ object SparkProtobuf { } } +object UDFWorkerProto { + import BuildCommons.protoVersion + lazy val settings = Seq( + PB.protocVersion := BuildCommons.protoVersion, + libraryDependencies += "com.google.protobuf" % "protobuf-java" % protoVersion % "protobuf", + (Compile / PB.targets) := Seq( + PB.gens.java -> target.value / "generated-sources" + ) + ) ++ { + val sparkProtocExecPath = sys.props.get("spark.protoc.executable.path") + if (sparkProtocExecPath.isDefined) { + Seq( + PB.protocExecutable := file(sparkProtocExecPath.get) + ) + } else { + Seq.empty + } + } +} + object Unsafe { lazy val settings = Seq() } @@ -1654,7 +1682,7 @@ object Unidoc { (JavaUnidoc / unidoc / unidocProjectFilter) := inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes, yarn, tags, streamingKafka010, sqlKafka010, connectCommon, connect, connectJdbc, - connectClient, connectShims, protobuf, profiler), + connectClient, connectShims, protobuf, profiler, udfWorkerProto, udfWorkerCore), ) } diff --git a/udf/worker/README.md b/udf/worker/README.md new file mode 100644 index 0000000000000..f13ce85d4dad1 --- /dev/null +++ b/udf/worker/README.md @@ -0,0 +1,49 @@ +# udf/worker -- Language-agnostic UDF Worker Framework + +Package structure for the UDF worker framework described in +[SPIP SPARK-55278](https://issues.apache.org/jira/browse/SPARK-55278). + +## Overview + +Spark processes a UDF by first obtaining a **WorkerDispatcher** from the worker +specification (plus context such as security scope). The dispatcher manages the +actual worker processes behind the scenes -- pooling, reuse, and termination are +all invisible to Spark. + +From the dispatcher, Spark gets a **WorkerSession**, which represents one single +UDF execution and can carry per-execution state. A WorkerSession is not 1-to-1 +mapped to an actual worker -- multiple sessions may share the same underlying +worker when it is reused. Worker reuse is managed by each dispatcher +implementation based on the worker specification. + +## Sub-packages + +``` +udf/worker/ +├── proto/ Protobuf definition of the worker specification +│ (UDFWorkerSpecification -- currently a placeholder). +│ WorkerSpecification -- typed Scala wrapper around the protobuf spec. +└── core/ Engine-side APIs (all @Experimental): + WorkerDispatcher -- manages workers for one spec; creates sessions. + WorkerSession -- represents one single UDF execution. + WorkerSecurityScope -- security boundary for connection pooling. +``` + +## Build + +SBT: +``` +build/sbt "udf-worker-core/compile" +build/sbt "udf-worker-core/test" +``` + +Maven: +``` +./build/mvn -pl udf/worker/proto,udf/worker/core -am compile +./build/mvn -pl udf/worker/proto,udf/worker/core -am test +``` + +## Design references + +* [SPIP Language-agnostic UDF Protocol for Spark](https://docs.google.com/document/d/19Whzq127QxVt2Luk0EClgaDtcpBsFUp67NcVdKKyPF8/edit?tab=t.0) +* [SPIP Language-agnostic UDF Protocol for Spark -- Worker Specification](https://docs.google.com/document/d/1Dx9NqHRNuUpatH9DYoFF9cmvUl2fqHT4Rjbyw4EGLHs/edit?tab=t.0#heading=h.4h01j4b8rjzv) diff --git a/udf/worker/core/pom.xml b/udf/worker/core/pom.xml new file mode 100644 index 0000000000000..69088d284365f --- /dev/null +++ b/udf/worker/core/pom.xml @@ -0,0 +1,66 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent_2.13 + 4.2.0-SNAPSHOT + ../../../pom.xml + + + spark-udf-worker-core_2.13 + jar + Spark Project UDF Worker Core + https://spark.apache.org/ + + + udf-worker-core + + + + + org.apache.spark + spark-tags_${scala.binary.version} + + + org.apache.spark + spark-udf-worker-proto_${scala.binary.version} + ${project.version} + + + org.scala-lang + scala-library + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + net.alchim31.maven + scala-maven-plugin + + + + diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala new file mode 100644 index 0000000000000..58fabbaea00df --- /dev/null +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerDispatcher.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.udf.worker.core + +import org.apache.spark.annotation.Experimental +import org.apache.spark.udf.worker.WorkerSpecification + +/** + * :: Experimental :: + * Manages workers for a single [[WorkerSpecification]] and hides worker details from Spark. + * + * A [[WorkerDispatcher]] is created from a worker specification (plus context such + * as security scope). It owns the underlying worker processes and connections, + * handling pooling, reuse, and lifecycle behind the scenes. Spark interacts with + * workers exclusively through the [[WorkerSession]]s returned by [[createSession]]. + */ +@Experimental +trait WorkerDispatcher extends AutoCloseable { + + def workerSpec: WorkerSpecification + + /** + * Creates a [[WorkerSession]] that maps to one single UDF execution. + * + * @param securityScope identifies which pool of workers may be reused for this + * session. Dispatcher implementations use the scope to + * decide whether an existing worker can be shared or a new + * one must be created. + */ + def createSession(securityScope: Option[WorkerSecurityScope]): WorkerSession + + override def close(): Unit +} diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSecurityScope.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSecurityScope.scala new file mode 100644 index 0000000000000..dc19b0b9f867a --- /dev/null +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSecurityScope.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.udf.worker.core + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Identifies a security boundary for worker connection pooling. + * + * Workers are only reused within the same security scope. Dispatcher + * implementations compare scopes using `equals` to decide whether an + * existing worker can be shared. Subclasses '''must''' override + * `equals` and `hashCode` so that structurally equivalent scopes + * match; otherwise, worker reuse will silently fail. + */ +@Experimental +abstract class WorkerSecurityScope { + override def equals(obj: Any): Boolean + override def hashCode(): Int +} diff --git a/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala new file mode 100644 index 0000000000000..83c392a895b66 --- /dev/null +++ b/udf/worker/core/src/main/scala/org/apache/spark/udf/worker/core/WorkerSession.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.udf.worker.core + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Represents one single UDF execution. + * + * A [[WorkerSession]] is obtained from [[WorkerDispatcher#createSession]] and + * can carry per-execution state for that UDF invocation. Implementations may + * add concrete data-processing methods and lifecycle hooks as needed. + * + * A WorkerSession is not 1-to-1 mapped to an actual worker process. Multiple + * WorkerSessions may be backed by the same worker when the worker is reused. + * Worker reuse and pooling are managed by each [[WorkerDispatcher]] + * implementation based on the [[WorkerSpecification]]. + */ +@Experimental +abstract class WorkerSession extends AutoCloseable { + override def close(): Unit = {} +} diff --git a/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/WorkerAbstractionSuite.scala b/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/WorkerAbstractionSuite.scala new file mode 100644 index 0000000000000..42f53af07424a --- /dev/null +++ b/udf/worker/core/src/test/scala/org/apache/spark/udf/worker/core/WorkerAbstractionSuite.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.udf.worker.core + +import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite + +class WorkerAbstractionSuite + extends AnyFunSuite { // scalastyle:ignore funsuite + + test("dummy") {} +} diff --git a/udf/worker/proto/pom.xml b/udf/worker/proto/pom.xml new file mode 100644 index 0000000000000..a91921122b8df --- /dev/null +++ b/udf/worker/proto/pom.xml @@ -0,0 +1,86 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent_2.13 + 4.2.0-SNAPSHOT + ../../../pom.xml + + + spark-udf-worker-proto_2.13 + jar + Spark Project UDF Worker Proto + https://spark.apache.org/ + + + udf-worker-proto + + + + + com.google.protobuf + protobuf-java + compile + + + org.apache.spark + spark-tags_${scala.binary.version} + + + org.scala-lang + scala-library + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + com.github.os72 + protoc-jar-maven-plugin + ${protoc-jar-maven-plugin.version} + + + generate-sources + + run + + + com.google.protobuf:protoc:${protobuf.version} + ${protobuf.version} + + src/main/protobuf + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + diff --git a/udf/worker/proto/src/main/protobuf/worker_spec.proto b/udf/worker/proto/src/main/protobuf/worker_spec.proto new file mode 100644 index 0000000000000..92dfccd6ef108 --- /dev/null +++ b/udf/worker/proto/src/main/protobuf/worker_spec.proto @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +syntax = "proto3"; + +package org.apache.spark.udf.worker; + +option java_package = "org.apache.spark.udf.worker"; +option java_multiple_files = true; + +// Placeholder -- full worker specification schema to be added. +// See design doc: SPIP SPARK-55278. +message UDFWorkerSpecification { +} diff --git a/udf/worker/proto/src/main/scala/org/apache/spark/udf/worker/WorkerSpecification.scala b/udf/worker/proto/src/main/scala/org/apache/spark/udf/worker/WorkerSpecification.scala new file mode 100644 index 0000000000000..e25b99b69990c --- /dev/null +++ b/udf/worker/proto/src/main/scala/org/apache/spark/udf/worker/WorkerSpecification.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.udf.worker + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Typed Scala wrapper around the protobuf [[UDFWorkerSpecification]]. + */ +@Experimental +class WorkerSpecification(val proto: UDFWorkerSpecification) { +}