Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-spark-connector_2.13</artifactId>
<version>3.4.1-SNAPSHOT</version>
<version>3.5.6-SNAPSHOT</version>
<name>StreamNative :: Pulsar Spark Connector</name>
<url>https://pulsar.apache.org</url>
<inceptionYear>2019</inceptionYear>
Expand Down Expand Up @@ -70,10 +70,11 @@
<scala.version>2.13.12</scala.version>
<scala.binary.version>2.13</scala.binary.version>
<scalatest.version>3.2.14</scalatest.version>
<spark.version>3.4.1</spark.version>
<spark.version>3.5.6</spark.version>
<commons-io.version>2.19.0</commons-io.version>
<testcontainers.version>1.18.3</testcontainers.version>
<bouncycastle.version>1.78</bouncycastle.version>
<jackson.version>2.15.2</jackson.version>

<!-- plugin dependencies -->
<maven.version>3.5.4</maven.version>
Expand All @@ -92,6 +93,23 @@

<!-- dependencies for all modules -->
<dependencies>
<!-- Jackson dependencies - ensure version consistency for tests -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>

<!-- pulsar dependency -->
<dependency>
<groupId>org.apache.pulsar</groupId>
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/org/apache/spark/sql/pulsar/JSONParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -367,15 +367,15 @@ class JacksonRecordParser(schema: DataType, val options: JSONOptions) extends Lo
case e @ (_: RuntimeException | _: JsonProcessingException | _: MalformedInputException) =>
// JSON parser currently doesn't support partial results for corrupted records.
// For such records, all fields other than the meta fields are set to `null`.
throw BadRecordException(() => recordLiteral(record), () => None, e)
throw BadRecordException(() => recordLiteral(record), () => Array.empty[InternalRow], e)
case e: CharConversionException if options.encoding.isEmpty =>
val msg =
"""JSON parser cannot handle a character in its input.
|Specifying encoding as an input option explicitly might help to resolve the issue.
|""".stripMargin + e.getMessage
val wrappedCharException = new CharConversionException(msg)
wrappedCharException.initCause(e)
throw BadRecordException(() => recordLiteral(record), () => None, wrappedCharException)
throw BadRecordException(() => recordLiteral(record), () => Array.empty[InternalRow], wrappedCharException)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ package org.apache.spark.sql.pulsar

import java.{util => ju}
import java.util.{Locale, UUID}

import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.common.naming.TopicName

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.json.JSONOptionsInRead
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.pulsar.PulsarSourceUtils.reportDataLossFunc
Expand Down Expand Up @@ -206,7 +205,7 @@ private[pulsar] class PulsarProvider
val caseInsensitiveParams = validateSinkOptions(parameters)

val (clientConfig, producerConfig, topic) = prepareConfForProducer(parameters)
PulsarSinks.validateQuery(data.schema.toAttributes, topic)
PulsarSinks.validateQuery(DataTypeUtils.toAttributes(data.schema), topic)

PulsarSinks.write(
sqlContext.sparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@ package org.apache.spark.sql.pulsar

import java.{util => ju}
import java.util.concurrent.TimeUnit

import scala.util.control.NonFatal

import org.apache.pulsar.client.api.{Producer, PulsarClientException, Schema}

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession, SQLContext}
import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext, SparkSession}
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.types._
Expand All @@ -44,7 +42,7 @@ private[pulsar] class PulsarSink(
override def toString: String = "PulsarSink"

override def addBatch(batchId: Long, data: DataFrame): Unit = {
PulsarSinks.validateQuery(data.schema.toAttributes, topic)
PulsarSinks.validateQuery(DataTypeUtils.toAttributes(data.schema), topic)

if (batchId <= latestBatchId) {
logInfo(s"Skipping already committed batch $batchId")
Expand Down
Loading