Skip to content

Commit 7f6875e

Browse files
author
blublinsky
committed
Some cleanup
1 parent c92a741 commit 7f6875e

File tree

6 files changed

+26
-11
lines changed

6 files changed

+26
-11
lines changed

src/main/scala/com/lightbend/kafka/ModelProvider.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ object ModelProvider {
4242
tRecord.writeTo(bos)
4343
sender.writeValue(ModelServingConfiguration.MODELS_TOPIC, bos.toByteArray)
4444
pause()
45-
4645
})
4746
}
4847
}

src/main/scala/com/lightbend/modelServer/DataProcessor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ class DataProcessor extends RichCoProcessFunction[WineRecord, ModelToServe, Doub
4646
println(s"New model - $model")
4747
newModelState.update(new ModelToServeStats(model))
4848
newModel = model.modelType match {
49-
case ModelDescriptor.ModelType.PMML => Some(PMMLModel(model.model))
50-
case ModelDescriptor.ModelType.TENSORFLOW => Some(TensorFlowModel(model.model))
49+
case ModelDescriptor.ModelType.PMML => PMMLModel(model.model) // PMML
50+
case ModelDescriptor.ModelType.TENSORFLOW => TensorFlowModel(model.model) // Tensorflow
5151
case _ => None // Not supported yet
5252
}
5353
}

src/main/scala/com/lightbend/modelServer/DataProcessorMap.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ class DataProcessorMap extends RichCoFlatMapFunction[WineRecord, ModelToServe, D
2727

2828
override def flatMap2(model: ModelToServe, out: Collector[Double]): Unit = {
2929
println(s"New model - $model")
30-
newModel = model.modelType match {
31-
case ModelDescriptor.ModelType.PMML => Some(new PMMLModel(model.model))
32-
case ModelDescriptor.ModelType.TENSORFLOW => Some(TensorFlowModel(model.model))
30+
newModel =
31+
model.modelType match {
32+
case ModelDescriptor.ModelType.PMML => PMMLModel(model.model) // PMML
33+
case ModelDescriptor.ModelType.TENSORFLOW => TensorFlowModel(model.model) // Tensorflow
3334
case _ => None // Not supported yet
3435
}
3536
}

src/main/scala/com/lightbend/modelServer/ModelTester.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ object ModelTester {
2424
val Tensormodel = TensorFlowModel(TensorflowbyteArray)
2525
println("PMML | Tensorflow")
2626
records.foreach(r => {
27-
val presult = PMMLmodel.score(r.asInstanceOf[AnyVal]).asInstanceOf[Double]
28-
val tresult = Tensormodel.score(r.asInstanceOf[AnyVal]).asInstanceOf[Double]
27+
val presult = PMMLmodel.get.score(r.asInstanceOf[AnyVal]).asInstanceOf[Double]
28+
val tresult = Tensormodel.get.score(r.asInstanceOf[AnyVal]).asInstanceOf[Double]
2929
println(s"$presult | $tresult")
3030
})
3131
}

src/main/scala/com/lightbend/modelServer/model/PMMLModel.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,13 @@ class PMMLModel(inputStream: Array[Byte]) extends Model {
7171

7272
object PMMLModel{
7373

74-
def apply(inputStream: Array[Byte]): PMMLModel = new PMMLModel(inputStream)
74+
def apply(inputStream: Array[Byte]): Option[PMMLModel] = {
75+
try {
76+
Some(new PMMLModel(inputStream))
77+
}catch{
78+
case t: Throwable => None
79+
}
80+
}
7581
private val optimizers = Array(new ExpressionOptimizer, new FieldOptimizer, new PredicateOptimizer, new GeneralRegressionModelOptimizer, new NaiveBayesModelOptimizer, new RegressionModelOptimizer)
7682
def optimize(pmml : PMML) = this.synchronized {
7783
optimizers.foreach(opt =>

src/main/scala/com/lightbend/modelServer/model/TensorFlowModel.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,18 @@ class TensorFlowModel(inputStream: Array[Byte]) extends Model {
4444
value._1.toDouble
4545
}
4646

47-
override def cleanup(): Unit = session.close
47+
override def cleanup(): Unit = {
48+
session.close
49+
graph.close
50+
}
4851
}
4952

5053
object TensorFlowModel{
51-
def apply(inputStream: Array[Byte]): TensorFlowModel = new TensorFlowModel(inputStream)
54+
def apply(inputStream: Array[Byte]): Option[TensorFlowModel] = {
55+
try {
56+
Some(new TensorFlowModel(inputStream))
57+
}catch{
58+
case t: Throwable => None
59+
}
60+
}
5261
}

0 commit comments

Comments
 (0)