Skip to content

Feature/spline 815 export import #1436

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Jul 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4d1039e
spline #815 Add sample lineage data
wajda Nov 17, 2024
702d763
spline #815 - Export/Import
wajda Jun 14, 2025
bc7821b
spline #815 - Working REST "GET consumer/execution-plans/{id}"
wajda Jun 28, 2025
a3cdbb2
spline #815 - Working AQL query
wajda Jun 28, 2025
1c4474f
spline #815 - Working VPack deserialization
wajda Jun 28, 2025
a8a45ed
spline #815 - Add `_id` mapping to the persistent `Vertex` entity.
wajda Jun 29, 2025
c69de7f
spline #815 - Working assembling `Operations` entity (with pointers t…
wajda Jun 29, 2025
9075475
spline #815 - First fully working version of the `GET /producer/execu…
wajda Jul 5, 2025
7bf5c24
spline #815 - Working version of the `GET /producer/execution-plans/`…
wajda Jul 5, 2025
ae155eb
spline #815 - Working version of the `GET /producer/execution-plans/e…
wajda Jul 5, 2025
e44505d
spline #815 - Do not serialize `None` values.
wajda Jul 5, 2025
ca389b9
spline #815 - Add integration tests.
wajda Jul 8, 2025
47fa766
spline #815 - Add integration tests.
wajda Jul 11, 2025
51f9149
spline #815 - Add integration tests.
wajda Jul 11, 2025
93a6d2d
spline #815 - Add integration tests.
wajda Jul 11, 2025
9991892
Update producer-services/src/main/scala/za/co/absa/spline/producer/se…
wajda Jul 12, 2025
0840634
spline #815 - Addressing PR comments
wajda Jul 12, 2025
e60510e
spline #815 - Fix CoPilot's BS
wajda Jul 12, 2025
be07bd9
spline #815 - Upgrade some dependencies
wajda Jul 12, 2025
bbb5af2
spline #815 - Fix: `lineage-import` should support both array or a si…
wajda Jul 14, 2025
79a2a18
spline #815 - Fix: Event object assembler to create ExpressionRef ins…
wajda Jul 14, 2025
f20ec87
spline #815 - Integration tests :: wait longer for the container to s…
wajda Jul 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
<artifactId>persistence</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>za.co.absa.spline</groupId>
<artifactId>producer-rest-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.github.scopt</groupId>
<artifactId>scopt_${scala.compat.version}</artifactId>
Expand Down
138 changes: 134 additions & 4 deletions admin/src/main/scala/za/co/absa/spline/admin/AdminCLI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,35 @@
package za.co.absa.spline.admin

import ch.qos.logback.classic.{Level, Logger}
import org.apache.http.Consts
import org.apache.http.entity.ContentType
import org.slf4j.Logger.ROOT_LOGGER_NAME
import org.slf4j.LoggerFactory
import org.slf4s.Logging
import scopt.{OptionDef, OptionParser}
import za.co.absa.spline.admin.AdminCLI.AdminCLIConfig
import za.co.absa.spline.common.ConsoleUtils._
import za.co.absa.spline.common.SplineBuildInfo
import za.co.absa.spline.common.rest.RESTClientApacheHttpImpl
import za.co.absa.spline.common.scala13.Option
import za.co.absa.spline.common.security.TLSUtils
import za.co.absa.spline.persistence.AuxiliaryDBAction._
import za.co.absa.spline.persistence.DefaultJsonSerDe._
import za.co.absa.spline.persistence.OnDBExistsAction.{Drop, Fail, Skip}
import za.co.absa.spline.persistence.{ArangoConnectionURL, ArangoManagerFactory, ArangoManagerFactoryImpl}
import za.co.absa.spline.producer.rest.ProducerAPI

import scala.concurrent.Await
import java.io.File
import java.net.URL
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.jdk.CollectionConverters._

object AdminCLI extends App {

import scala.concurrent.ExecutionContext.Implicits._

case class AdminCLIConfig(
cmd: Command = null,
logLevel: Level = Level.INFO,
Expand Down Expand Up @@ -63,7 +73,7 @@ object AdminCLI extends App {
new AdminCLI(dbManagerFactory).exec(args)
}

class AdminCLI(dbManagerFactory: ArangoManagerFactory) {
class AdminCLI(dbManagerFactory: ArangoManagerFactory) extends Logging {

def exec(args: Array[String]): Unit = {

Expand Down Expand Up @@ -149,6 +159,38 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) {
children (this.dbCommandOptions: _*)
)

this.placeNewLine()

(cmd("lineage-import")
action ((_, c) => c.copy(cmd = LineageImport()))
text "Import lineage data files into the Spline database"
children(
opt[File]("dir")
text "Path to the directory containing the lineage data files to import."
required()
action { case (dir, c@AdminCLIConfig(cmd: LineageImport, _, _)) => c.copy(cmd.copy(lineageDumpPath = dir)) },
opt[URL]("producer-url")
text "Producer API base URL to which the lineage data files will be posted."
required()
action { case (url, c@AdminCLIConfig(cmd: LineageImport, _, _)) => c.copy(cmd.copy(producerApiUrl = url)) }
))

this.placeNewLine()

(cmd("lineage-export")
action ((_, c) => c.copy(cmd = LineageExport()))
text "Export lineage data files from the Spline database"
children(
opt[File]("dir")
text "Path to the directory where the lineage data files will be exported."
required()
action { case (dir, c@AdminCLIConfig(cmd: LineageExport, _, _)) => c.copy(cmd.copy(lineageDumpPath = dir)) },
opt[URL]("producer-url")
text "Producer API base URL from which the lineage data files will be fetched."
required()
action { case (url, c@AdminCLIConfig(cmd: LineageExport, _, _)) => c.copy(cmd.copy(producerApiUrl = url)) }
))

checkConfig {
case AdminCLIConfig(null, _, _) =>
failure("No command given")
Expand Down Expand Up @@ -187,6 +229,94 @@ class AdminCLI(dbManagerFactory: ArangoManagerFactory) {
val dbManager = dbManagerFactory.create(url, sslCtxOpt)
Await.result(dbManager.upgrade(), Duration.Inf)

case LineageImport(producerApiBaseUrl, path) =>
val dir = path.toPath

val restClient = new RESTClientApacheHttpImpl(
uri = producerApiBaseUrl.toURI,
maybeSslContext = sslCtxOpt,
maybeCredentials = None
)

def process(pattern: String, url: String, fileContentToBodyFn: String => String): Future[Int] = {
val dirStream = Files.newDirectoryStream(dir, pattern)
try {
dirStream.asScala
.map(_.toFile)
.filter(_.isFile)
.foldLeft(Future.successful(0)) { (prevFut, file) =>
prevFut.flatMap { n =>
val rawJsonStr = Files.readString(file.toPath).trim
restClient.post(
path = url,
body = fileContentToBodyFn(rawJsonStr),
contentType = ContentType.create(ProducerAPI.MimeTypeV1_1, Consts.UTF_8)
).map(_ => n + 1)
}
}
} finally {
dirStream.close()
}
}

val resFuture = for {
nPlans <- process("plan-*.json", "execution-plans", identity)
_ <- process("event-*.json", "execution-events", s => if (s startsWith "[") s else s"[$s]")
} yield nPlans

val nPlans = Await.result(resFuture, Duration.Inf)
println(ansi"%green{Imported $nPlans execution plans with events from $path}")

case LineageExport(producerApiBaseUrl, path) =>
path.mkdirs()

val restClient = new RESTClientApacheHttpImpl(
uri = producerApiBaseUrl.toURI,
maybeSslContext = sslCtxOpt,
maybeCredentials = None
)

val resFuture = restClient
.get("execution-plans")
.map(_.fromJson[Array[String]])
.flatMap((ids: Array[String]) => {
if (ids.isEmpty) {
println(ansi"%yellow{No lineage data found in the database}")
Future.successful((0, 0))
} else {
println(s"Found ${ids.length} execution plans in the database. Exporting to $path/ ...")
ids.foldLeft(Future.successful((0, 0))) { (prevFut, planId) =>
prevFut.flatMap { case (nPlans, nEvents) =>
log.debug(s"Exporting execution plan with id: $planId")
val eventualPlanJson = restClient.get(s"execution-plans/$planId")
val eventualEventJsons = restClient.get(s"execution-plans/$planId/events")
for {
planJson <- eventualPlanJson
events <- eventualEventJsons.map(_.fromJson[Seq[Map[String, Any]]])
} yield {
Files.writeString(
path.toPath.resolve(s"plan-$planId.json"),
planJson,
StandardCharsets.UTF_8
)
events.foreach(event => {
val eventJson = event.toJson
Files.writeString(
path.toPath.resolve(s"event-$planId-${event("timestamp")}.json"),
eventJson,
StandardCharsets.UTF_8
)
})
(nPlans + 1, nEvents + events.length)
}
}
}
}
})

val (nPlans, nEvents) = Await.result(resFuture, Duration.Inf)
println(ansi"%green{Exported $nPlans execution plans and $nEvents execution events}")

case DBExec(url, actions) =>
val dbManager = dbManagerFactory.create(url, sslCtxOpt)
Await.result(dbManager.execute(actions: _*), Duration.Inf)
Expand Down
26 changes: 20 additions & 6 deletions admin/src/main/scala/za/co/absa/spline/admin/commands.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ package za.co.absa.spline.admin
import za.co.absa.spline.admin.DBCommand._
import za.co.absa.spline.persistence.{ArangoConnectionURL, AuxiliaryDBAction}

import java.io.File
import java.net.URL

sealed trait Command

sealed trait DBCommand extends Command {
def dbUrl: Url

def dbUrl_= : Url => Self = selfCopy(_)

protected type Self <: DBCommand
Expand All @@ -39,29 +43,39 @@ object DBCommand {
def unapply(cmd: DBCommand): Option[DBCommandProps] = Some((cmd.dbUrl))
}

case class LineageImport(
producerApiUrl: URL = null,
lineageDumpPath: File = null,
) extends Command

case class LineageExport(
producerApiUrl: URL = null,
lineageDumpPath: File = null,
) extends Command

case class DBInit(
override val dbUrl: Url = null,
force: Boolean = false,
skip: Boolean = false
) extends DBCommand {
protected override type Self = DBInit
protected override val selfCopy: DBCommandProps => Self = copy(_)
override protected type Self = DBInit
override protected val selfCopy: DBCommandProps => Self = copy(_, force, skip)
}

//noinspection ConvertibleToMethodValue
case class DBUpgrade(
override val dbUrl: Url = null,
) extends DBCommand {
protected override type Self = DBUpgrade
protected override val selfCopy: DBCommandProps => Self = copy(_)
override protected type Self = DBUpgrade
override protected val selfCopy: DBCommandProps => Self = copy(_)
}

case class DBExec(
override val dbUrl: Url = null,
actions: Seq[AuxiliaryDBAction] = Nil,
) extends DBCommand {
protected override type Self = DBExec
protected override val selfCopy: DBCommandProps => Self = copy(_)
override protected type Self = DBExec
override protected val selfCopy: DBCommandProps => Self = copy(_, actions)

def addAction(action: AuxiliaryDBAction): DBExec = copy(actions = actions :+ action)
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,5 +169,13 @@ class AdminCLISpec
import za.co.absa.spline.persistence.AuxiliaryDBAction._
verify(arangoManagerMock).execute(IndicesDelete, ViewsDelete, FoxxReinstall, ViewsCreate, IndicesCreate)
}

behavior of "Lineage-import"

it should "call no action" in {
captureStdErr {
captureExitStatus(cli.exec(Array("lineage-import"))) should be(1)
} should include("--help")
}
}
}
27 changes: 20 additions & 7 deletions build/parent-pom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,11 @@
<maven.build.timestamp.format>yyyy-MM-dd'T'HH:mm:ssX</maven.build.timestamp.format>
<scala.version>2.12.19</scala.version>
<scala.compat.version>2.12</scala.compat.version>
<logback.version>1.2.6</logback.version>
<logback.version>1.2.13</logback.version>
<slf4j.version>1.7.25</slf4j.version>
<json4s.version>3.6.7</json4s.version>
<spring.version>5.2.25.RELEASE</spring.version>
<json4s.version>3.6.11</json4s.version>
<springfox-swagger.version>2.9.2</springfox-swagger.version>
<jackson.version>2.12.2</jackson.version>
<jackson.version>2.12.7</jackson.version>
<finatra.jackson.version>21.5.0</finatra.jackson.version>
<node.version>v20.14.0</node.version>
<npm.version>10.8.0</npm.version>
Expand Down Expand Up @@ -293,7 +292,7 @@
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_${scala.compat.version}</artifactId>
<version>0.9.1</version>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.scalaz</groupId>
Expand Down Expand Up @@ -443,7 +442,7 @@
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-framework-bom</artifactId>
<version>${spring.version}</version>
<version>5.3.39</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down Expand Up @@ -529,6 +528,16 @@
<version>${logback.version}</version>
</dependency>

<!-- Netty -->

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-bom</artifactId>
<version>4.1.121.Final</version>
<type>pom</type>
<scope>import</scope>
</dependency>

<!-- others -->

<dependency>
Expand All @@ -546,12 +555,16 @@
<artifactId>config</artifactId>
<version>1.4.0</version>
</dependency>

<dependency>
<groupId>org.scala-graph</groupId>
<artifactId>graph-core_${scala.compat.version}</artifactId>
<version>1.12.5</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.4.8-jre</version>
</dependency>

<!-- Test scope dependencies -->

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.commons.configuration2

import org.apache.commons.lang3.StringUtils.{isBlank, isNotBlank}

import scala.jdk.CollectionConverters._
import scala.collection.JavaConverters._
import scala.reflect.runtime.universe.{TypeTag, typeOf}
import scala.util.Try

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@

package za.co.absa.spline.common.rest

import org.apache.http.entity.ContentType

import scala.concurrent.Future

trait RESTClient {
def get(path: String): Future[String]
def delete(path: String): Future[Unit]
def post(path: String, body: String): Future[Unit]
def post(path: String, body: Array[Byte]): Future[Unit]
def post(path: String, body: Array[Byte], contentType: ContentType): Future[Unit]
def post(path: String, body: String, contentType: ContentType): Future[Unit]
}
Loading