Skip to content

Commit fe91aa0

Browse files
committed
Build with eBay internal Spark
1 parent acc27a4 commit fe91aa0

File tree

24 files changed

+330
-99
lines changed

24 files changed

+330
-99
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
name: Build eBay bundle package
17+
18+
env:
19+
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true
20+
M2_REPOSITORY: '/root/.m2/repository/'
21+
22+
on:
23+
push:
24+
branches:
25+
- 'ebay-build**'
26+
tags:
27+
- 'ebay-build**'
28+
29+
jobs:
30+
build-native-lib:
31+
runs-on: ubuntu-22.04
32+
env:
33+
OAUTH_KEY: ${{ secrets.OAUTH_KEY }}
34+
steps:
35+
- uses: actions/checkout@v4
36+
- name: Build Gluten velox third party
37+
run: |
38+
docker run -v $GITHUB_WORKSPACE:/work -w /work apache/gluten:vcpkg-centos-7 bash -c "
39+
df -a
40+
cd /work
41+
bash dev/ci-velox-buildstatic-centos-7.sh
42+
mkdir -p /work/.m2/repository/org/apache/arrow/
43+
cp -r $M2_REPOSITORY/org/apache/arrow arrow-package
44+
"
45+
- name: Upload native libs
46+
uses: actions/upload-artifact@v4
47+
with:
48+
path: ./cpp/build/releases/
49+
name: velox-native-lib-${{github.sha}}
50+
retention-days: 1
51+
if-no-files-found: error
52+
- name: Upload Artifact Arrow Jar
53+
uses: actions/upload-artifact@v4
54+
with:
55+
path: ./arrow-package
56+
name: velox-arrow-jar-centos-7-${{github.sha}}
57+
retention-days: 1
58+
if-no-files-found: error
59+
60+
build-bundle-package-centos8:
61+
needs: build-native-lib
62+
runs-on: ubuntu-22.04
63+
env:
64+
OAUTH_KEY: ${{ secrets.OAUTH_KEY }}
65+
container: centos:8
66+
steps:
67+
- uses: actions/checkout@v1
68+
with:
69+
repository: wangyum/packages
70+
ref: master
71+
token: ${{ secrets.EBAY_TOKEN }}
72+
- name: Install packages to $M2_REPOSITORY
73+
run: |
74+
mkdir -p $M2_REPOSITORY
75+
rm -rf $M2_REPOSITORY/io && rm -rf $M2_REPOSITORY/org
76+
mv ../packages/* $M2_REPOSITORY
77+
cd $M2_REPOSITORY && find . -name "_*.repositories" | xargs rm -rf # Fix Could not find artifact io.ebay.rheos ...
78+
- uses: actions/checkout@v2
79+
- name: Download All Artifacts
80+
uses: actions/download-artifact@v4
81+
with:
82+
name: velox-native-lib-${{github.sha}}
83+
path: ./cpp/build/releases
84+
- name: Download All Arrow Jar Artifacts
85+
uses: actions/download-artifact@v4
86+
with:
87+
name: velox-arrow-jar-centos-7-${{github.sha}}
88+
path: /root/.m2/repository/org/apache/arrow/
89+
- name: Setup java and maven
90+
run: |
91+
sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-* && \
92+
sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-* && \
93+
yum update -y && yum install -y java-1.8.0-openjdk-devel wget && \
94+
wget https://dlcdn.apache.org/maven/maven-3/3.8.9/binaries/apache-maven-3.8.9-bin.tar.gz && \
95+
tar -xvf apache-maven-3.8.9-bin.tar.gz && \
96+
mv apache-maven-3.8.9 /usr/lib/maven
97+
- uses: actions/checkout@v1
98+
with:
99+
repository: wangyum/ebay-spark
100+
ref: gluten-build
101+
token: ${{ secrets.EBAY_TOKEN }}
102+
- name: Install ebay-spark to $M2_REPOSITORY
103+
run: |
104+
cd ../ebay-spark && build/mvn clean install -Dhadoop.version=3.3.3.1.0.39 -DskipTests=true -Dmaven.javadoc.skip=true
105+
- name: Build for eBay Spark 3.5
106+
run: |
107+
cd $GITHUB_WORKSPACE/ && \
108+
export MAVEN_HOME=/usr/lib/maven && \
109+
export PATH=${PATH}:${MAVEN_HOME}/bin && \
110+
mvn clean install -Pspark-3.5 -Dhadoop.version=3.3.3.1.0.39 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -DskipTests=true -Dmaven.javadoc.skip=true -Dmaven.scaladoc.skip=true -Dmaven.source.skip -Dcyclonedx.skip=true
111+
ls -lh package/target/
112+
- name: Upload bundle package
113+
uses: actions/upload-artifact@v4
114+
with:
115+
name: gluten-velox-bundle-package
116+
path: package/target/gluten-velox-bundle-*.jar
117+
retention-days: 3
118+
if-no-files-found: error
119+
- name: Install ebay-spark to $M2_REPOSITORY with scala 2.13
120+
run: |
121+
cd ../ebay-spark && ./dev/change-scala-version.sh 2.13 && build/mvn clean install -Pscala-2.13 -Dhadoop.version=3.3.3.1.0.39 -DskipTests=true -Dmaven.javadoc.skip=true
122+
- name: Build for ebay Spark 3.5 with scala 2.13
123+
run: |
124+
cd $GITHUB_WORKSPACE/ && \
125+
export MAVEN_HOME=/usr/lib/maven && \
126+
export PATH=${PATH}:${MAVEN_HOME}/bin && \
127+
export SPARK_SCALA_VERSION=2.13 && \
128+
mvn clean install -Pspark-3.5 -Pscala-2.13 -Dhadoop.version=3.3.3.1.0.39 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -DskipTests=true -Dmaven.javadoc.skip=true -Dmaven.scaladoc.skip=true -Dmaven.source.skip -Dcyclonedx.skip=true
129+
ls -lh package/target/
130+
md5sum package/target/gluten-velox-bundle-spark3.5*
131+
- name: Upload bundle package with scala 2.13
132+
uses: actions/upload-artifact@v4
133+
with:
134+
name: gluten-velox-bundle-package-scala2.13
135+
path: package/target/gluten-velox-bundle-*.jar
136+
retention-days: 3
137+
if-no-files-found: error

.idea/vcs.xml

Lines changed: 3 additions & 21 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ class VeloxColumnarWriteFilesRDD(
154154
} else {
155155
Some(
156156
WriteTaskResult(
157-
new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
157+
new TaskCommitMessage(addedAbsPathFiles.toMap, updatedPartitions, numFiles),
158158
summary))
159159
}
160160
}
@@ -277,7 +277,7 @@ case class VeloxColumnarWriteFilesExec private (
277277
if (rdd.partitions.length == 0) {
278278
// SPARK-23271 If we are attempting to write a zero partition rdd, create a dummy single
279279
// partition rdd to make sure we at least set up one write task to write the metadata.
280-
writeFilesForEmptyRDD(description, committer, jobTrackerID)
280+
writeFilesForEmptyRDD(description, committer, jobTrackerID, writeFilesSpec)
281281
} else {
282282
new VeloxColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
283283
}

dev/build_arrow.sh

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ BUILD_TYPE=Release
2626

2727
function prepare_arrow_build() {
2828
mkdir -p ${ARROW_PREFIX}/../ && pushd ${ARROW_PREFIX}/../ && sudo rm -rf arrow_ep/
29-
wget_and_untar https://github.com/apache/arrow/archive/refs/tags/apache-arrow-${VELOX_ARROW_BUILD_VERSION}/.tar.gz arrow_ep
30-
#wget_and_untar https://archive.apache.org/dist/arrow/arrow-${VELOX_ARROW_BUILD_VERSION}/apache-arrow-${VELOX_ARROW_BUILD_VERSION}.tar.gz arrow_ep
29+
git clone -b apache-arrow-15.0.0 https://github.com/apache/arrow.git arrow_ep
3130
cd arrow_ep
3231
patch -p1 < $CURRENT_DIR/../ep/build-velox/src/modify_arrow.patch
3332
patch -p1 < $CURRENT_DIR/../ep/build-velox/src/modify_arrow_dataset_scan_option.patch

dev/ci-velox-buildstatic-centos-7.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,5 @@ set -e
1919

2020
source /opt/rh/devtoolset-11/enable
2121
export NUM_THREADS=4
22-
./dev/builddeps-veloxbe.sh --enable_vcpkg=ON --build_arrow=OFF --build_tests=OFF --build_benchmarks=OFF \
22+
./dev/builddeps-veloxbe.sh --enable_vcpkg=ON --build_arrow=ON --build_tests=OFF --build_benchmarks=OFF \
2323
--build_examples=OFF --enable_s3=ON --enable_gcs=ON --enable_hdfs=ON --enable_abfs=ON

gluten-core/src/main/scala/org/apache/gluten/config/GlutenCoreConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ object GlutenCoreConfig {
8080
.doc("Whether to enable gluten. Default value is true. Just an experimental property." +
8181
" Recommend to enable/disable Gluten through the setting for spark.plugins.")
8282
.booleanConf
83-
.createWithDefault(true)
83+
.createWithDefault(false)
8484

8585
// Options used by RAS.
8686
val RAS_ENABLED =

gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ trait GlutenPlan
5757
rowType() != Convention.RowType.None
5858
}
5959

60+
final override val supportsVectorExecution: Boolean = true
61+
6062
override def batchType(): Convention.BatchType
6163

6264
override def rowType0(): Convention.RowType

gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -963,15 +963,15 @@ object GlutenConfig {
963963
.doc("The threshold to determine whether to use sort-based columnar shuffle. Sort-based " +
964964
"shuffle will be used if the number of partitions is greater than this threshold.")
965965
.intConf
966-
.createWithDefault(4000)
966+
.createWithDefault(0)
967967

968968
val COLUMNAR_SHUFFLE_SORT_COLUMNS_THRESHOLD =
969969
buildConf("spark.gluten.sql.columnar.shuffle.sort.columns.threshold")
970970
.internal()
971971
.doc("The threshold to determine whether to use sort-based columnar shuffle. Sort-based " +
972972
"shuffle will be used if the number of columns is greater than this threshold.")
973973
.intConf
974-
.createWithDefault(100000)
974+
.createWithDefault(0)
975975

976976
val COLUMNAR_TABLE_CACHE_ENABLED =
977977
buildConf("spark.gluten.sql.columnar.tableCache")
@@ -1559,7 +1559,7 @@ object GlutenConfig {
15591559
.internal()
15601560
.doc("If enabled, gluten will convert the viewfs path to hdfs path in scala side")
15611561
.booleanConf
1562-
.createWithDefault(false)
1562+
.createWithDefault(true)
15631563

15641564
val ENCRYPTED_PARQUET_FALLBACK_ENABLED =
15651565
buildConf("spark.gluten.sql.fallbackEncryptedParquet")

gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions._
2929
import org.apache.spark.sql.catalyst.plans.QueryPlan
3030
import org.apache.spark.sql.catalyst.util.truncatedString
3131
import org.apache.spark.sql.connector.catalog.Table
32+
import org.apache.spark.sql.connector.catalog.functions.Reducer
3233
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
3334
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExecShim, FileScan}
3435
import org.apache.spark.sql.execution.metric.SQLMetric
@@ -42,7 +43,9 @@ case class BatchScanExecTransformer(
4243
override val keyGroupedPartitioning: Option[Seq[Expression]] = None,
4344
override val ordering: Option[Seq[SortOrder]] = None,
4445
@transient override val table: Table,
46+
override val joinKeyPositions: Option[Seq[Int]] = None,
4547
override val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
48+
override val reducers: Option[Seq[Option[Reducer[_, _]]]] = None,
4649
override val applyPartialClustering: Boolean = false,
4750
override val replicatePartitions: Boolean = false)
4851
extends BatchScanExecTransformerBase(
@@ -52,9 +55,12 @@ case class BatchScanExecTransformer(
5255
keyGroupedPartitioning,
5356
ordering,
5457
table,
58+
joinKeyPositions,
5559
commonPartitionValues,
60+
reducers,
5661
applyPartialClustering,
57-
replicatePartitions) {
62+
replicatePartitions
63+
) {
5864

5965
protected[this] def supportsBatchScan(scan: Scan): Boolean = {
6066
scan.isInstanceOf[FileScan]
@@ -77,7 +83,9 @@ abstract class BatchScanExecTransformerBase(
7783
override val keyGroupedPartitioning: Option[Seq[Expression]] = None,
7884
override val ordering: Option[Seq[SortOrder]] = None,
7985
@transient override val table: Table,
86+
override val joinKeyPositions: Option[Seq[Int]] = None,
8087
override val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
88+
override val reducers: Option[Seq[Option[Reducer[_, _]]]] = None,
8189
override val applyPartialClustering: Boolean = false,
8290
override val replicatePartitions: Boolean = false)
8391
extends BatchScanExecShim(
@@ -87,9 +95,12 @@ abstract class BatchScanExecTransformerBase(
8795
keyGroupedPartitioning,
8896
ordering,
8997
table,
98+
joinKeyPositions,
9099
commonPartitionValues,
100+
reducers,
91101
applyPartialClustering,
92-
replicatePartitions)
102+
replicatePartitions
103+
)
93104
with BasicScanExecTransformer {
94105

95106
// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.

gluten-substrait/src/main/scala/org/apache/gluten/utils/InputPartitionsUtil.scala

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ import org.apache.gluten.sql.shims.SparkShimLoader
2121
import org.apache.spark.internal.Logging
2222
import org.apache.spark.sql.catalyst.expressions.Attribute
2323
import org.apache.spark.sql.connector.read.InputPartition
24+
import org.apache.spark.sql.execution.PartitionedFileUtil
2425
import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, HadoopFsRelation, PartitionDirectory}
26+
import org.apache.spark.sql.execution.datasources.FilePartition.{maxSplitBytesBySpecifiedNum, minPartitionNumBySpecifiedSize}
2527
import org.apache.spark.sql.types.StructType
2628
import org.apache.spark.util.collection.BitSet
2729

@@ -47,9 +49,30 @@ case class InputPartitionsUtil(
4749
}
4850

4951
private def genNonBuckedInputPartitionSeq(): Seq[InputPartition] = {
52+
val originSize = FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions)
5053
val openCostInBytes = relation.sparkSession.sessionState.conf.filesOpenCostInBytes
5154
val maxSplitBytes =
52-
FilePartition.maxSplitBytes(relation.sparkSession, selectedPartitions)
55+
if (
56+
relation.sparkSession.sessionState.conf.bucketingEnabled &&
57+
relation.bucketSpec.isDefined
58+
) {
59+
val partitionNum =
60+
minPartitionNumBySpecifiedSize(relation.sparkSession, selectedPartitions, originSize)
61+
val bucketNum = math.max(
62+
relation.bucketSpec.get.numBuckets,
63+
relation.sparkSession.sessionState.conf.numShufflePartitions)
64+
val maxBucketScanParts = relation.sparkSession.sessionState.conf.filesMaxPartitionNum
65+
.map(_.min(bucketNum))
66+
.getOrElse(bucketNum)
67+
if (partitionNum > maxBucketScanParts) {
68+
maxSplitBytesBySpecifiedNum(relation.sparkSession, selectedPartitions, maxBucketScanParts)
69+
} else {
70+
originSize
71+
}
72+
} else {
73+
originSize
74+
}
75+
5376
logInfo(
5477
s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
5578
s"open cost is considered as scanning $openCostInBytes bytes.")
@@ -73,22 +96,19 @@ case class InputPartitionsUtil(
7396
val splitFiles = selectedPartitions
7497
.flatMap {
7598
partition =>
76-
SparkShimLoader.getSparkShims.getFileStatus(partition).flatMap {
99+
partition.files.flatMap {
77100
file =>
78101
// getPath() is very expensive so we only want to call it once in this block:
79-
val filePath = file._1.getPath
102+
val filePath = file.path
80103
if (shouldProcess(filePath)) {
81104
val isSplitable =
82105
SparkShimLoader.getSparkShims.isFileSplittable(relation, filePath, requiredSchema)
83-
SparkShimLoader.getSparkShims.splitFiles(
84-
sparkSession = relation.sparkSession,
85-
file = file._1,
86-
filePath = filePath,
87-
isSplitable = isSplitable,
88-
maxSplitBytes = maxSplitBytes,
89-
partitionValues = partition.values,
90-
metadata = file._2
91-
)
106+
PartitionedFileUtil.splitFiles(
107+
relation.sparkSession,
108+
file,
109+
isSplitable,
110+
maxSplitBytes,
111+
partition.values)
92112
} else {
93113
Seq.empty
94114
}

0 commit comments

Comments
 (0)