Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions sparkflow/HogwildSparkModel.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from flask import Flask, request
import six.moves.cPickle as pickle
from sparkflow.ml_util import tensorflow_get_weights, tensorflow_set_weights, handle_features, handle_feed_dict, handle_shuffle
from ml_util import tensorflow_get_weights, tensorflow_set_weights, handle_features, handle_feed_dict, handle_shuffle

from google.protobuf import json_format
import socket
import time
import tensorflow as tf
import tensorflow_core as tf
import itertools
from sparkflow.RWLock import RWLock
from RWLock import RWLock
from multiprocessing import Process
import multiprocessing
import uuid
Expand All @@ -19,7 +19,7 @@
log.setLevel(logging.ERROR)


def get_server_weights(master_url='localhost:5000'):
def get_server_weights(master_url='0.0.0.0:5000'):
"""
This will get the raw weights, pickle load them, and return.
"""
Expand All @@ -28,15 +28,15 @@ def get_server_weights(master_url='localhost:5000'):
return weights


def put_deltas_to_server(delta, master_url='localhost:5000'):
def put_deltas_to_server(delta, master_url='0.0.0.0:5000'):
"""
This updates the master parameters. We just use simple pickle serialization here.
"""
requests.post('http://{0}/update'.format(master_url), data=pickle.dumps(delta, -1))


def handle_model(data, graph_json, tfInput, tfLabel=None,
master_url='localhost:5000', iters=1000,
master_url='0.0.0.0:5000', iters=1000,
mini_batch_size=-1, shuffle=True,
mini_stochastic_iters=-1, verbose=0, loss_callback=None):
is_supervised = tfLabel is not None
Expand All @@ -47,7 +47,7 @@ def handle_model(data, graph_json, tfInput, tfLabel=None,
new_graph = tf.Graph()
with tf.Session(graph=new_graph) as sess:
tf.train.import_meta_graph(gd)
loss_variable = tf.get_collection(tf.GraphKeys.LOSSES)[0]
loss_variable = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)
sess.run(tf.global_variables_initializer())
trainable_variables = tf.trainable_variables()
grads = tf.gradients(loss_variable, trainable_variables)
Expand Down Expand Up @@ -187,7 +187,7 @@ def start_service(self, metagraph, optimizer, port):
new_graph = tf.Graph()
with new_graph.as_default():
tf.train.import_meta_graph(metagraph)
loss_variable = tf.get_collection(tf.GraphKeys.LOSSES)[0]
loss_variable = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES)
trainable_variables = tf.trainable_variables()
grads = tf.gradients(loss_variable, trainable_variables)
grads = list(zip(grads, trainable_variables))
Expand Down
4 changes: 2 additions & 2 deletions sparkflow/graph_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import tensorflow as tf
import tensorflow_core as tf
from google.protobuf import json_format
import json

Expand All @@ -10,7 +10,7 @@ def build_graph(func):
"""
first_graph = tf.Graph()
with first_graph.as_default() as g:
v = func()
v = func
mg = json_format.MessageToJson(tf.train.export_meta_graph())
return mg

Expand Down
2 changes: 1 addition & 1 deletion sparkflow/ml_util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import numpy as np
import tensorflow as tf
import tensorflow_core as tf
import json
from google.protobuf import json_format
from pyspark.ml.linalg import Vectors
Expand Down
8 changes: 4 additions & 4 deletions sparkflow/tensorflow_async.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import tensorflow as tf
from sparkflow.pipeline_util import PysparkReaderWriter
import tensorflow_core as tf
from pipeline_util import PysparkReaderWriter
import numpy as np

from pyspark.ml.param import Param, Params, TypeConverters
Expand All @@ -8,8 +8,8 @@
from pyspark.ml import Model
from pyspark.ml.util import Identifiable, MLReadable, MLWritable
from pyspark import keyword_only
from sparkflow.HogwildSparkModel import HogwildSparkModel
from sparkflow.ml_util import convert_weights_to_json, predict_func
from HogwildSparkModel import HogwildSparkModel
from ml_util import convert_weights_to_json, predict_func
from pyspark import SparkContext
import json

Expand Down
4 changes: 2 additions & 2 deletions sparkflow/tensorflow_model_loader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import tensorflow as tf
from sparkflow.tensorflow_async import SparkAsyncDLModel
import tensorflow_core as tf
from tensorflow_async import SparkAsyncDLModel
from google.protobuf import json_format
from pyspark.ml.pipeline import PipelineModel
import json
Expand Down