Skip to content

Conversation

nlu90
Copy link
Collaborator

@nlu90 nlu90 commented Jul 10, 2025

Motivation

Explain here the context, and why you're making that change. What is the problem you're trying to solve.

Modifications

Describe the modifications you've done.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

  • This change is a trivial rework / code cleanup without any test coverage.

  • This change is already covered by existing tests, such as:

  • This change added tests and can be verified as follows:

Documentation

Check the box below.

Need to update docs?

  • doc-required
  • no-need-doc
  • doc

@nlu90 nlu90 requested a review from a team as a code owner July 10, 2025 04:10
@github-actions github-actions bot added the no-need-doc This pr does not need any document label Jul 10, 2025
freeznet
freeznet previously approved these changes Jul 10, 2025
@nlu90
Copy link
Collaborator Author

nlu90 commented Jul 11, 2025

Test Result
➜  pulsar-spark git:(neng/upgrade-spark-3.5.6) ✗ mvn test          
[INFO] Scanning for projects...
[INFO] 
[INFO] -------< io.streamnative.connectors:pulsar-spark-connector_2.13 >-------
[INFO] Building StreamNative :: Pulsar Spark Connector 3.5.6-SNAPSHOT
[INFO]   from pom.xml
[INFO] --------------------------------[ jar ]---------------------------------
[WARNING] 2 problems were encountered while building the effective model for org.apache.yetus:audience-annotations:jar:0.5.0 during dependency collection step for project (use -X to see details)
[INFO] 
[INFO] --- enforcer:1.4.1:enforce (enforce-maven-version) @ pulsar-spark-connector_2.13 ---
[INFO] 
[INFO] --- enforcer:1.4.1:enforce (enforce-java-version) @ pulsar-spark-connector_2.13 ---
[INFO] 
[INFO] --- enforcer:1.4.1:enforce (enforce-output-timestamp-property) @ pulsar-spark-connector_2.13 ---
[INFO] 
[INFO] --- scalafmt:1.1.1640084764.9f463a9:format (default) @ pulsar-spark-connector_2.13 ---
[WARNING] format.skipSources set, ignoring main directories
[WARNING] format.skipTestSources set, ignoring validateOnly directories
[WARNING] No sources specified, skipping formatting
[INFO] 
[INFO] --- scala:4.8.1:add-source (eclipse-add-source) @ pulsar-spark-connector_2.13 ---
[INFO] Add Source directory: /Users/nlu/work/streamnative/pulsar-spark/src/main/scala
[INFO] Add Test Source directory: /Users/nlu/work/streamnative/pulsar-spark/src/test/scala
[INFO] 
[INFO] --- remote-resources:1.7.0:process (process-resource-bundles) @ pulsar-spark-connector_2.13 ---
[INFO] Preparing remote bundle org.apache:apache-jar-resource-bundle:1.4
[INFO] Copying 3 resources from 1 bundle.
[INFO] 
[INFO] --- resources:3.2.0:resources (default-resources) @ pulsar-spark-connector_2.13 ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Using 'UTF-8' encoding to copy filtered properties files.
[INFO] Copying 1 resource
[INFO] Copying 3 resources
[INFO] 
[INFO] --- compiler:3.10.1:compile (default-compile) @ pulsar-spark-connector_2.13 ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- scala:4.8.1:compile (scala-compile-first) @ pulsar-spark-connector_2.13 ---
[WARNING]  Expected all dependencies to require Scala version: 2.13.12
[WARNING]  org.apache.spark:spark-core_2.13:3.5.6 requires scala version: 2.13.8
[WARNING] Multiple versions of scala libraries detected!
[INFO] Compiler bridge file: /Users/nlu/.sbt/1.0/zinc/org.scala-sbt/org.scala-sbt-compiler-bridge_2.13-1.8.0-bin_2.13.12__55.0-1.8.0_20221110T195421.jar
[INFO] compile in 1.7 s
[INFO] 
[INFO] --- resources:3.2.0:testResources (default-testResources) @ pulsar-spark-connector_2.13 ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Using 'UTF-8' encoding to copy filtered properties files.
[INFO] Copying 1 resource
[INFO] Copying 3 resources
[INFO] 
[INFO] --- compiler:3.10.1:testCompile (default-testCompile) @ pulsar-spark-connector_2.13 ---
[INFO] Nothing to compile - all classes are up to date
[INFO] 
[INFO] --- scala:4.8.1:testCompile (scala-test-compile-first) @ pulsar-spark-connector_2.13 ---
[WARNING]  Expected all dependencies to require Scala version: 2.13.12
[WARNING]  org.apache.spark:spark-core_2.13:3.5.6 requires scala version: 2.13.8
[WARNING] Multiple versions of scala libraries detected!
[INFO] Compiler bridge file: /Users/nlu/.sbt/1.0/zinc/org.scala-sbt/org.scala-sbt-compiler-bridge_2.13-1.8.0-bin_2.13.12__55.0-1.8.0_20221110T195421.jar
[INFO] compile in 1.7 s
[INFO] 
[INFO] --- surefire:2.22.2:test (default-test) @ pulsar-spark-connector_2.13 ---
[INFO] 
[INFO] -------------------------------------------------------
[INFO]  T E S T S
[INFO] -------------------------------------------------------
[INFO] 
[INFO] Results:
[INFO] 
[INFO] Tests run: 0, Failures: 0, Errors: 0, Skipped: 0
[INFO] 
[INFO] 
[INFO] --- scalatest:2.2.0:test (test) @ pulsar-spark-connector_2.13 ---
[INFO] ScalaTest report directory: /Users/nlu/work/streamnative/pulsar-spark/target/surefire-reports
Discovery starting.
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Discovery completed in 1 second, 106 milliseconds.
Run starting. Expected test count is: 121
PulsarMicroBatchV1SourceSuite:
SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".
SLF4J: Defaulting to no-operation MDCAdapter implementation.
SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/Users/nlu/.m2/repository/org/apache/spark/spark-unsafe_2.13/3.5.6/spark-unsafe_2.13-3.5.6.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
- cannot stop Pulsar stream
- assign from latest offsets (failOnDataLoss: true)
- assign from earliest offsets (failOnDataLoss: true)
- assign from time (failOnDataLoss: true)
- assign from specific offsets (failOnDataLoss: true)
- subscribing topic by name from latest offsets (failOnDataLoss: true)
- subscribing topic by name from earliest offsets (failOnDataLoss: true)
- subscribing topic by name from specific offsets (failOnDataLoss: true)
- subscribing topic by pattern from latest offsets (failOnDataLoss: true)
- subscribing topic by pattern from earliest offsets (failOnDataLoss: true)
- subscribing topic by pattern from specific offsets (failOnDataLoss: true)
- assign from latest offsets (failOnDataLoss: false)
- assign from earliest offsets (failOnDataLoss: false)
- assign from time (failOnDataLoss: false)
- assign from specific offsets (failOnDataLoss: false)
- subscribing topic by name from latest offsets (failOnDataLoss: false)
- subscribing topic by name from earliest offsets (failOnDataLoss: false)
- subscribing topic by name from specific offsets (failOnDataLoss: false)
- subscribing topic by pattern from latest offsets (failOnDataLoss: false)
- subscribing topic by pattern from earliest offsets (failOnDataLoss: false)
- subscribing topic by pattern from specific offsets (failOnDataLoss: false)
- bad source options
- get offsets from case insensitive parameters !!! IGNORED !!!
- Pulsar column types
- test boolean stream
- test int stream
- test string stream
- test byte stream
- test double stream
- test float stream
- test short stream
- test long stream
- test byte array stream
- test date stream
- test timestamp stream
- test struct types in avro
- test struct types in json
- test with batched messages
- test with non-batched messages
- test with batched and non-batched messages
- (de)serialization of initial offsets
- input row metrics
- subscribing topic by pattern with topic deletions
- subscribe topic by pattern with topic recreation between batches
- PulsarSource with watermark
- delete a topic when a Spark job is running
- ensure stream-stream self-join generates only one offset in log and correct metrics
- V1 Source is used by default
PulsarSourceOffsetSuite:
- comparison SpecificPulsarOffset(t: 1:1:-1 (class org.apache.pulsar.client.impl.MessageIdImpl) <=> SpecificPulsarOffset(t: 1:2:-1 (class org.apache.pulsar.client.impl.MessageIdImpl)
- comparison SpecificPulsarOffset(t: 1:1:-1 (class org.apache.pulsar.client.impl.MessageIdImpl, t1: 1:1:-1 (class org.apache.pulsar.client.impl.MessageIdImpl) <=> SpecificPulsarOffset(t: 1:2:-1 (class org.apache.pulsar.client.impl.MessageIdImpl, t1: 1:2:-1 (class org.apache.pulsar.client.impl.MessageIdImpl)
- comparison SpecificPulsarOffset(t: 1:1:-1 (class org.apache.pulsar.client.impl.MessageIdImpl) <=> SpecificPulsarOffset(t: 1:2:-1 (class org.apache.pulsar.client.impl.MessageIdImpl, t1: 1:1:-1 (class org.apache.pulsar.client.impl.MessageIdImpl)
- comparison SpecificPulsarOffset(t: 1:1:-1 (class org.apache.pulsar.client.impl.MessageIdImpl) <=> SpecificPulsarOffset(t: 1:2:-1 (class org.apache.pulsar.client.impl.MessageIdImpl, t1: 1:3:-1 (class org.apache.pulsar.client.impl.MessageIdImpl)
- basic serialization - deserialization
- OffsetSeqLog serialization - deserialization
ConnectorInitializationSuite:
- connector initialization test
PulsarMicroBatchSourceTriggerAvailableNowSuite:
- assign from latest offsets: available now
- assign from earliest offsets: available now
- assign from time: available now
- assign from specific offsets: available now
- subscribing topic by name from latest offsets: available now
- subscribing topic by name from earliest offsets: available now
- subscribing topic by name from specific offsets: available now
- subscribing topic by pattern from latest offsets: available now
- subscribing topic by pattern from earliest offsets: available now
- subscribing topic by pattern from specific offsets: available now
JsonUtilsTestSuite:
- serialize and deserialize topics
- serialize and deserialize topic earliest offsets
- serialize and deserialize topic latest offsets
- serialize and deserialize topic specific offsets
- serialize and deserialize topic specific batch offsets
- serialize and deserialize topic starting times
PulsarAdmissionControlSuite:
- Only admit first entry of ledger
- setting invalid config PulsarAdmin
- Admit entry in the middle of the ledger
- Check last batch where message size is greater than maxBytesPerTrigger
- Admission Control for multiple topics
- Admission Control for concurrent topic writes
- Set invalid configuration for PulsarAdmin in stream options
- Admission Control with one topic-partition
- Admission Control with multiple topic-partitions
- Add topic-partition after starting stream
PulsarClientFactorySuite:
- Set Pulsar client factory class
- Unset Pulsar client factory class
PulsarConfigUpdaterSuite:
- set should always set value
- setIfUnset without existing key should set value
- setIfUnset with existing key should not set value
PulsarSourceTest:
PulsarSourceTTLSuite:
- test data loss with topic deletion across batches
SchemaInfoSerDeSuite:
- serialized schemaInfo serde
PulsarSinkSuite:
- batch - write to pulsar
- batch - boolean
- batch - int
- batch - string
- batch - byte
- batch - double
- batch - float
- batch - short
- batch - long
- batch - date
- batch - timestamp
- batch - byte array
- batch - struct types
- batch - null topic field value, and no topic option
- batch - unsupported save modes
- streaming - write to pulsar with topic field
- streaming - write aggregation w/o topic field, with topic option
- streaming - aggregation with topic field and topic option
- streaming - write bool
- streaming - write int
- streaming - write string
- streaming - write byte
- streaming - write double
- streaming - write float
- streaming - write short
- streaming - write long
- streaming - write byte array
- streaming - write date
- streaming - write timestamp
- streaming - write struct type
- streaming - write data with bad schema
- streaming - write data with valid schema but wrong types
- batch - write to pulsar with producer conf case sensitive
- streaming - write to pulsar with producer case sensitive conf
- generic - write big data with small producer buffer !!! IGNORED !!!
Run completed in 16 minutes, 11 seconds.
Total number of tests run: 121
Suites: completed 13, aborted 0
Tests: succeeded 121, failed 0, canceled 0, ignored 2, pending 0
All tests passed.
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  16:18 min
[INFO] Finished at: 2025-07-10T16:53:26-07:00
[INFO] ------------------------------------------------------------------------

freeznet
freeznet previously approved these changes Jul 11, 2025
@steventang2013
Copy link

Hi @nlu90 , is this ready to be merged?
We're looking to adopt pulsar-spark with spark 3.5+ very soon.

@nlu90 nlu90 merged commit 03708f3 into master Aug 26, 2025
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
no-need-doc This pr does not need any document
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants