Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,17 @@ def __hash__(self):
sbt_test_goals=["variant/test"],
)

udf_worker = Module(
name="udf-worker",
dependencies=[tags],
source_file_regexes=[
"udf/worker/",
],
sbt_test_goals=[
"udf-worker-core/test",
],
)

core = Module(
name="core",
dependencies=[kvstore, network_common, network_shuffle, unsafe, launcher, utils],
Expand Down
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@
<module>connector/kafka-0-10-sql</module>
<module>connector/avro</module>
<module>connector/protobuf</module>
<module>udf/worker/proto</module>
<module>udf/worker/core</module>
<!-- See additional modules enabled by profiles below -->
</modules>

Expand Down
34 changes: 31 additions & 3 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,17 @@ object BuildCommons {
Seq("connect-common", "connect", "connect-client-jdbc", "connect-client-jvm", "connect-shims")
.map(ProjectRef(buildLocation, _))

val udfWorkerProjects@Seq(udfWorkerProto, udfWorkerCore) =
Seq("udf-worker-proto", "udf-worker-core").map(ProjectRef(buildLocation, _))

val allProjects@Seq(
core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe, tags, sketch, kvstore,
commonUtils, commonUtilsJava, variant, pipelines, _*
) = Seq(
"core", "graphx", "mllib", "mllib-local", "repl", "network-common", "network-shuffle", "launcher", "unsafe",
"tags", "sketch", "kvstore", "common-utils", "common-utils-java", "variant", "pipelines"
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ connectProjects
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects ++ connectProjects ++
udfWorkerProjects

val optionallyEnabledProjects@Seq(kubernetes, yarn,
sparkGangliaLgpl, streamingKinesisAsl, profiler,
Expand Down Expand Up @@ -405,7 +409,8 @@ object SparkBuild extends PomBuild {
Seq(
spark, hive, hiveThriftServer, repl, networkCommon, networkShuffle, networkYarn,
unsafe, tags, tokenProviderKafka010, sqlKafka010, pipelines, connectCommon, connect,
connectJdbc, connectClient, variant, connectShims, profiler, commonUtilsJava
connectJdbc, connectClient, variant, connectShims, profiler, commonUtilsJava,
udfWorkerProto, udfWorkerCore
).contains(x)
}

Expand Down Expand Up @@ -458,6 +463,9 @@ object SparkBuild extends PomBuild {
/* Protobuf settings */
enable(SparkProtobuf.settings)(protobuf)

/* UDF Worker Proto settings */
enable(UDFWorkerProto.settings)(udfWorkerProto)

enable(DockerIntegrationTests.settings)(dockerIntegrationTests)

enable(KubernetesIntegrationTests.settings)(kubernetesIntegrationTests)
Expand Down Expand Up @@ -1077,6 +1085,26 @@ object SparkProtobuf {
}
}

object UDFWorkerProto {
import BuildCommons.protoVersion
lazy val settings = Seq(
PB.protocVersion := BuildCommons.protoVersion,
libraryDependencies += "com.google.protobuf" % "protobuf-java" % protoVersion % "protobuf",
(Compile / PB.targets) := Seq(
PB.gens.java -> target.value / "generated-sources"
)
) ++ {
val sparkProtocExecPath = sys.props.get("spark.protoc.executable.path")
if (sparkProtocExecPath.isDefined) {
Seq(
PB.protocExecutable := file(sparkProtocExecPath.get)
)
} else {
Seq.empty
}
}
}

object Unsafe {
lazy val settings = Seq()
}
Expand Down Expand Up @@ -1654,7 +1682,7 @@ object Unidoc {
(JavaUnidoc / unidoc / unidocProjectFilter) :=
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, kubernetes,
yarn, tags, streamingKafka010, sqlKafka010, connectCommon, connect, connectJdbc,
connectClient, connectShims, protobuf, profiler),
connectClient, connectShims, protobuf, profiler, udfWorkerProto, udfWorkerCore),
)
}

Expand Down
49 changes: 49 additions & 0 deletions udf/worker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# udf/worker -- Language-agnostic UDF Worker Framework

Package structure for the UDF worker framework described in
[SPIP SPARK-55278](https://issues.apache.org/jira/browse/SPARK-55278).

## Overview

Spark processes a UDF by first obtaining a **WorkerDispatcher** from the worker
specification (plus context such as security scope). The dispatcher manages the
actual worker processes behind the scenes -- pooling, reuse, and termination are
all invisible to Spark.

From the dispatcher, Spark gets a **WorkerSession**, which represents one single
UDF execution and can carry per-execution state. A WorkerSession is not 1-to-1
mapped to an actual worker -- multiple sessions may share the same underlying
worker when it is reused. Worker reuse is managed by each dispatcher
implementation based on the worker specification.

## Sub-packages

```
udf/worker/
├── proto/ Protobuf definition of the worker specification
│ (UDFWorkerSpecification -- currently a placeholder).
│ WorkerSpecification -- typed Scala wrapper around the protobuf spec.
└── core/ Engine-side APIs (all @Experimental):
WorkerDispatcher -- manages workers for one spec; creates sessions.
WorkerSession -- represents one single UDF execution.
WorkerSecurityScope -- security boundary for connection pooling.
```

## Build

SBT:
```
build/sbt "udf-worker-core/compile"
build/sbt "udf-worker-core/test"
```

Maven:
```
./build/mvn -pl udf/worker/proto,udf/worker/core -am compile
./build/mvn -pl udf/worker/proto,udf/worker/core -am test
```

## Design references

* [SPIP Language-agnostic UDF Protocol for Spark](https://docs.google.com/document/d/19Whzq127QxVt2Luk0EClgaDtcpBsFUp67NcVdKKyPF8/edit?tab=t.0)
* [SPIP Language-agnostic UDF Protocol for Spark -- Worker Specification](https://docs.google.com/document/d/1Dx9NqHRNuUpatH9DYoFF9cmvUl2fqHT4Rjbyw4EGLHs/edit?tab=t.0#heading=h.4h01j4b8rjzv)
66 changes: 66 additions & 0 deletions udf/worker/core/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.13</artifactId>
<version>4.2.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

<artifactId>spark-udf-worker-core_2.13</artifactId>
<packaging>jar</packaging>
<name>Spark Project UDF Worker Core</name>
<url>https://spark.apache.org/</url>

<properties>
<sbt.project.name>udf-worker-core</sbt.project.name>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-udf-worker-proto_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
</dependencies>

<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.udf.worker.core

import org.apache.spark.annotation.Experimental
import org.apache.spark.udf.worker.WorkerSpecification

/**
* :: Experimental ::
* Manages workers for a single [[WorkerSpecification]] and hides worker details from Spark.
*
* A [[WorkerDispatcher]] is created from a worker specification (plus context such
* as security scope). It owns the underlying worker processes and connections,
* handling pooling, reuse, and lifecycle behind the scenes. Spark interacts with
* workers exclusively through the [[WorkerSession]]s returned by [[createSession]].
*/
@Experimental
trait WorkerDispatcher extends AutoCloseable {

def workerSpec: WorkerSpecification

/**
* Creates a [[WorkerSession]] that maps to one single UDF execution.
*
* @param securityScope identifies which pool of workers may be reused for this
* session. Dispatcher implementations use the scope to
* decide whether an existing worker can be shared or a new
* one must be created.
*/
def createSession(securityScope: Option[WorkerSecurityScope]): WorkerSession

override def close(): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.udf.worker.core

import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* Identifies a security boundary for worker connection pooling.
*
* Workers are only reused within the same security scope. Dispatcher
* implementations compare scopes using `equals` to decide whether an
* existing worker can be shared. Subclasses '''must''' override
* `equals` and `hashCode` so that structurally equivalent scopes
* match; otherwise, worker reuse will silently fail.
*/
@Experimental
abstract class WorkerSecurityScope {
override def equals(obj: Any): Boolean
override def hashCode(): Int
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.udf.worker.core

import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
* Represents one single UDF execution.
*
* A [[WorkerSession]] is obtained from [[WorkerDispatcher#createSession]] and
* can carry per-execution state for that UDF invocation. Implementations may
* add concrete data-processing methods and lifecycle hooks as needed.
*
* A WorkerSession is not 1-to-1 mapped to an actual worker process. Multiple
* WorkerSessions may be backed by the same worker when the worker is reused.
* Worker reuse and pooling are managed by each [[WorkerDispatcher]]
* implementation based on the [[WorkerSpecification]].
*/
@Experimental
abstract class WorkerSession extends AutoCloseable {
override def close(): Unit = {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.udf.worker.core

import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite

class WorkerAbstractionSuite
extends AnyFunSuite { // scalastyle:ignore funsuite

test("dummy") {}
}
Loading