From 7f8f6a830c506589c0d7d671068762a26b0cd7f0 Mon Sep 17 00:00:00 2001 From: mingkhoi Date: Mon, 24 Jan 2022 23:35:03 +0700 Subject: [PATCH 01/11] tensorflow 1 support in google colab --- sparkflow/HogwildSparkModel.py | 4 ++-- sparkflow/graph_utils.py | 2 +- sparkflow/ml_util.py | 2 +- sparkflow/tensorflow_async.py | 2 +- sparkflow/tensorflow_model_loader.py | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sparkflow/HogwildSparkModel.py b/sparkflow/HogwildSparkModel.py index bfc59d6..19c8c61 100644 --- a/sparkflow/HogwildSparkModel.py +++ b/sparkflow/HogwildSparkModel.py @@ -1,11 +1,11 @@ from flask import Flask, request import six.moves.cPickle as pickle -from sparkflow.ml_util import tensorflow_get_weights, tensorflow_set_weights, handle_features, handle_feed_dict, handle_shuffle +from sparkflow.ml_util import tensorflow_core_get_weights, tensorflow_set_weights, handle_features, handle_feed_dict, handle_shuffle from google.protobuf import json_format import socket import time -import tensorflow as tf +import tensorflow_core as tf import itertools from sparkflow.RWLock import RWLock from multiprocessing import Process diff --git a/sparkflow/graph_utils.py b/sparkflow/graph_utils.py index 89c644a..635a1dd 100644 --- a/sparkflow/graph_utils.py +++ b/sparkflow/graph_utils.py @@ -1,4 +1,4 @@ -import tensorflow as tf +import tensorflow_core as tf from google.protobuf import json_format import json diff --git a/sparkflow/ml_util.py b/sparkflow/ml_util.py index c1d70d4..dc0bcd1 100644 --- a/sparkflow/ml_util.py +++ b/sparkflow/ml_util.py @@ -1,5 +1,5 @@ import numpy as np -import tensorflow as tf +import tensorflow_core as tf import json from google.protobuf import json_format from pyspark.ml.linalg import Vectors diff --git a/sparkflow/tensorflow_async.py b/sparkflow/tensorflow_async.py index a8c5e3c..ca88ace 100644 --- a/sparkflow/tensorflow_async.py +++ b/sparkflow/tensorflow_async.py @@ -1,4 +1,4 @@ -import tensorflow as tf +import tensorflow_core as tf from sparkflow.pipeline_util import PysparkReaderWriter import numpy as np diff --git a/sparkflow/tensorflow_model_loader.py b/sparkflow/tensorflow_model_loader.py index b9e91fd..137faeb 100644 --- a/sparkflow/tensorflow_model_loader.py +++ b/sparkflow/tensorflow_model_loader.py @@ -1,4 +1,4 @@ -import tensorflow as tf +import tensorflow_core as tf from sparkflow.tensorflow_async import SparkAsyncDLModel from google.protobuf import json_format from pyspark.ml.pipeline import PipelineModel From c2a652db9df36c6b2fdd6b235035a3eca672b517 Mon Sep 17 00:00:00 2001 From: mingkhoi Date: Mon, 24 Jan 2022 23:36:54 +0700 Subject: [PATCH 02/11] fix bug --- sparkflow/HogwildSparkModel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sparkflow/HogwildSparkModel.py b/sparkflow/HogwildSparkModel.py index 19c8c61..f419529 100644 --- a/sparkflow/HogwildSparkModel.py +++ b/sparkflow/HogwildSparkModel.py @@ -1,6 +1,6 @@ from flask import Flask, request import six.moves.cPickle as pickle -from sparkflow.ml_util import tensorflow_core_get_weights, tensorflow_set_weights, handle_features, handle_feed_dict, handle_shuffle +from sparkflow.ml_util import tensorflow_get_weights, tensorflow_set_weights, handle_features, handle_feed_dict, handle_shuffle from google.protobuf import json_format import socket From 41926a1bdb9d5f4d1b6f728d67e3c04f45016cb4 Mon Sep 17 00:00:00 2001 From: mingkhoi Date: Tue, 25 Jan 2022 00:01:07 +0700 Subject: [PATCH 03/11] not callable function, tensorflow 1.15 --- sparkflow/graph_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sparkflow/graph_utils.py b/sparkflow/graph_utils.py index 635a1dd..fd16c9a 100644 --- a/sparkflow/graph_utils.py +++ b/sparkflow/graph_utils.py @@ -10,7 +10,7 @@ def build_graph(func): """ first_graph = tf.Graph() with first_graph.as_default() as g: - v = func() + v = func mg = json_format.MessageToJson(tf.train.export_meta_graph()) return mg From 0925156df8146cfdb408421f3f214449674bf961 Mon Sep 17 00:00:00 2001 From: mingkhoi Date: Tue, 25 Jan 2022 00:07:48 +0700 Subject: [PATCH 04/11] use local import instead --- sparkflow/HogwildSparkModel.py | 4 ++-- sparkflow/tensorflow_async.py | 6 +++--- sparkflow/tensorflow_model_loader.py | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sparkflow/HogwildSparkModel.py b/sparkflow/HogwildSparkModel.py index f419529..c77ec6e 100644 --- a/sparkflow/HogwildSparkModel.py +++ b/sparkflow/HogwildSparkModel.py @@ -1,13 +1,13 @@ from flask import Flask, request import six.moves.cPickle as pickle -from sparkflow.ml_util import tensorflow_get_weights, tensorflow_set_weights, handle_features, handle_feed_dict, handle_shuffle +from .sparkflow.ml_util import tensorflow_get_weights, tensorflow_set_weights, handle_features, handle_feed_dict, handle_shuffle from google.protobuf import json_format import socket import time import tensorflow_core as tf import itertools -from sparkflow.RWLock import RWLock +from .sparkflow.RWLock import RWLock from multiprocessing import Process import multiprocessing import uuid diff --git a/sparkflow/tensorflow_async.py b/sparkflow/tensorflow_async.py index ca88ace..ef55f06 100644 --- a/sparkflow/tensorflow_async.py +++ b/sparkflow/tensorflow_async.py @@ -1,5 +1,5 @@ import tensorflow_core as tf -from sparkflow.pipeline_util import PysparkReaderWriter +from .sparkflow.pipeline_util import PysparkReaderWriter import numpy as np from pyspark.ml.param import Param, Params, TypeConverters @@ -8,8 +8,8 @@ from pyspark.ml import Model from pyspark.ml.util import Identifiable, MLReadable, MLWritable from pyspark import keyword_only -from sparkflow.HogwildSparkModel import HogwildSparkModel -from sparkflow.ml_util import convert_weights_to_json, predict_func +from .sparkflow.HogwildSparkModel import HogwildSparkModel +from .sparkflow.ml_util import convert_weights_to_json, predict_func from pyspark import SparkContext import json diff --git a/sparkflow/tensorflow_model_loader.py b/sparkflow/tensorflow_model_loader.py index 137faeb..b20d98d 100644 --- a/sparkflow/tensorflow_model_loader.py +++ b/sparkflow/tensorflow_model_loader.py @@ -1,5 +1,5 @@ import tensorflow_core as tf -from sparkflow.tensorflow_async import SparkAsyncDLModel +from .sparkflow.tensorflow_async import SparkAsyncDLModel from google.protobuf import json_format from pyspark.ml.pipeline import PipelineModel import json From 2d54b95f879c07a23de5e1d91dbf13341bc81609 Mon Sep 17 00:00:00 2001 From: mingkhoi Date: Tue, 25 Jan 2022 00:25:59 +0700 Subject: [PATCH 05/11] import from local packages --- sparkflow/HogwildSparkModel.py | 4 ++-- sparkflow/tensorflow_async.py | 6 +++--- sparkflow/tensorflow_model_loader.py | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sparkflow/HogwildSparkModel.py b/sparkflow/HogwildSparkModel.py index c77ec6e..2050fa3 100644 --- a/sparkflow/HogwildSparkModel.py +++ b/sparkflow/HogwildSparkModel.py @@ -1,13 +1,13 @@ from flask import Flask, request import six.moves.cPickle as pickle -from .sparkflow.ml_util import tensorflow_get_weights, tensorflow_set_weights, handle_features, handle_feed_dict, handle_shuffle +from ml_util import tensorflow_get_weights, tensorflow_set_weights, handle_features, handle_feed_dict, handle_shuffle from google.protobuf import json_format import socket import time import tensorflow_core as tf import itertools -from .sparkflow.RWLock import RWLock +from RWLock import RWLock from multiprocessing import Process import multiprocessing import uuid diff --git a/sparkflow/tensorflow_async.py b/sparkflow/tensorflow_async.py index ef55f06..f7b7582 100644 --- a/sparkflow/tensorflow_async.py +++ b/sparkflow/tensorflow_async.py @@ -1,5 +1,5 @@ import tensorflow_core as tf -from .sparkflow.pipeline_util import PysparkReaderWriter +from pipeline_util import PysparkReaderWriter import numpy as np from pyspark.ml.param import Param, Params, TypeConverters @@ -8,8 +8,8 @@ from pyspark.ml import Model from pyspark.ml.util import Identifiable, MLReadable, MLWritable from pyspark import keyword_only -from .sparkflow.HogwildSparkModel import HogwildSparkModel -from .sparkflow.ml_util import convert_weights_to_json, predict_func +from HogwildSparkModel import HogwildSparkModel +from ml_util import convert_weights_to_json, predict_func from pyspark import SparkContext import json diff --git a/sparkflow/tensorflow_model_loader.py b/sparkflow/tensorflow_model_loader.py index b20d98d..0e5f497 100644 --- a/sparkflow/tensorflow_model_loader.py +++ b/sparkflow/tensorflow_model_loader.py @@ -1,5 +1,5 @@ import tensorflow_core as tf -from .sparkflow.tensorflow_async import SparkAsyncDLModel +from tensorflow_async import SparkAsyncDLModel from google.protobuf import json_format from pyspark.ml.pipeline import PipelineModel import json From 76340213e394eb521f82afe6237297483dc88b35 Mon Sep 17 00:00:00 2001 From: mingkhoi Date: Tue, 25 Jan 2022 00:27:17 +0700 Subject: [PATCH 06/11] test --- sparkflow/tensorflow_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sparkflow/tensorflow_async.py b/sparkflow/tensorflow_async.py index f7b7582..9cfbc6b 100644 --- a/sparkflow/tensorflow_async.py +++ b/sparkflow/tensorflow_async.py @@ -1,5 +1,5 @@ import tensorflow_core as tf -from pipeline_util import PysparkReaderWriter +from .pipeline_util import PysparkReaderWriter import numpy as np from pyspark.ml.param import Param, Params, TypeConverters From c2ae2a444b5c8caae90f5e71287e336dc4c233fd Mon Sep 17 00:00:00 2001 From: mingkhoi Date: Tue, 25 Jan 2022 00:29:35 +0700 Subject: [PATCH 07/11] test error --- sparkflow/tensorflow_async.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sparkflow/tensorflow_async.py b/sparkflow/tensorflow_async.py index 9cfbc6b..663cf25 100644 --- a/sparkflow/tensorflow_async.py +++ b/sparkflow/tensorflow_async.py @@ -1,5 +1,5 @@ import tensorflow_core as tf -from .pipeline_util import PysparkReaderWriter +import pipeline_util.PysparkReaderWriter import numpy as np from pyspark.ml.param import Param, Params, TypeConverters @@ -8,8 +8,8 @@ from pyspark.ml import Model from pyspark.ml.util import Identifiable, MLReadable, MLWritable from pyspark import keyword_only -from HogwildSparkModel import HogwildSparkModel -from ml_util import convert_weights_to_json, predict_func +import HogwildSparkModel.HogwildSparkModel +import ml_util.convert_weights_to_json, ml_util.predict_func from pyspark import SparkContext import json From aa47011100243d8b20861ed401a6c5a4d13d5d1c Mon Sep 17 00:00:00 2001 From: mingkhoi Date: Tue, 25 Jan 2022 00:33:46 +0700 Subject: [PATCH 08/11] test --- sparkflow/tensorflow_async.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sparkflow/tensorflow_async.py b/sparkflow/tensorflow_async.py index 663cf25..2aa55e1 100644 --- a/sparkflow/tensorflow_async.py +++ b/sparkflow/tensorflow_async.py @@ -1,5 +1,5 @@ import tensorflow_core as tf -import pipeline_util.PysparkReaderWriter +from pipeline_util import PysparkReaderWriter import numpy as np from pyspark.ml.param import Param, Params, TypeConverters @@ -8,8 +8,8 @@ from pyspark.ml import Model from pyspark.ml.util import Identifiable, MLReadable, MLWritable from pyspark import keyword_only -import HogwildSparkModel.HogwildSparkModel -import ml_util.convert_weights_to_json, ml_util.predict_func +from HogwildSparkModel import HogwildSparkModel +from ml_util import convert_weights_to_json, predict_func from pyspark import SparkContext import json From 444c6bcd15db540457ae86f379896242b63e9fb4 Mon Sep 17 00:00:00 2001 From: mingkhoi Date: Tue, 25 Jan 2022 18:05:16 +0700 Subject: [PATCH 09/11] global variables instead of neurons --- sparkflow/HogwildSparkModel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sparkflow/HogwildSparkModel.py b/sparkflow/HogwildSparkModel.py index 2050fa3..96aa99b 100644 --- a/sparkflow/HogwildSparkModel.py +++ b/sparkflow/HogwildSparkModel.py @@ -187,7 +187,7 @@ def start_service(self, metagraph, optimizer, port): new_graph = tf.Graph() with new_graph.as_default(): tf.train.import_meta_graph(metagraph) - loss_variable = tf.get_collection(tf.GraphKeys.LOSSES)[0] + loss_variable = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES) trainable_variables = tf.trainable_variables() grads = tf.gradients(loss_variable, trainable_variables) grads = list(zip(grads, trainable_variables)) From f47736d9d6749c0976dd9bfe30fdec1a86343d4c Mon Sep 17 00:00:00 2001 From: mingkhoi Date: Tue, 25 Jan 2022 18:10:05 +0700 Subject: [PATCH 10/11] switch to global variables --- sparkflow/HogwildSparkModel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sparkflow/HogwildSparkModel.py b/sparkflow/HogwildSparkModel.py index 96aa99b..fe9034d 100644 --- a/sparkflow/HogwildSparkModel.py +++ b/sparkflow/HogwildSparkModel.py @@ -47,7 +47,7 @@ def handle_model(data, graph_json, tfInput, tfLabel=None, new_graph = tf.Graph() with tf.Session(graph=new_graph) as sess: tf.train.import_meta_graph(gd) - loss_variable = tf.get_collection(tf.GraphKeys.LOSSES)[0] + loss_variable = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES) sess.run(tf.global_variables_initializer()) trainable_variables = tf.trainable_variables() grads = tf.gradients(loss_variable, trainable_variables) From 6e74f7661372cd31ae267965f25d05315ab8e0f6 Mon Sep 17 00:00:00 2001 From: mingkhoi Date: Tue, 25 Jan 2022 18:40:59 +0700 Subject: [PATCH 11/11] change master_url --- sparkflow/HogwildSparkModel.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sparkflow/HogwildSparkModel.py b/sparkflow/HogwildSparkModel.py index fe9034d..d4d6d65 100644 --- a/sparkflow/HogwildSparkModel.py +++ b/sparkflow/HogwildSparkModel.py @@ -19,7 +19,7 @@ log.setLevel(logging.ERROR) -def get_server_weights(master_url='localhost:5000'): +def get_server_weights(master_url='0.0.0.0:5000'): """ This will get the raw weights, pickle load them, and return. """ @@ -28,7 +28,7 @@ def get_server_weights(master_url='localhost:5000'): return weights -def put_deltas_to_server(delta, master_url='localhost:5000'): +def put_deltas_to_server(delta, master_url='0.0.0.0:5000'): """ This updates the master parameters. We just use simple pickle serialization here. """ @@ -36,7 +36,7 @@ def put_deltas_to_server(delta, master_url='localhost:5000'): def handle_model(data, graph_json, tfInput, tfLabel=None, - master_url='localhost:5000', iters=1000, + master_url='0.0.0.0:5000', iters=1000, mini_batch_size=-1, shuffle=True, mini_stochastic_iters=-1, verbose=0, loss_callback=None): is_supervised = tfLabel is not None