-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathparallel_with_shared_memory_fit.py
More file actions
50 lines (44 loc) · 2.26 KB
/
parallel_with_shared_memory_fit.py
File metadata and controls
50 lines (44 loc) · 2.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import itertools
import time
from multiprocessing import Process, Pipe
import pandas as pd
import time
import numpy as np
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.neural_network import MLPRegressor
#########
huge_dict = pd.read_hdf('my_dataframe.hdf','table') #~roughly 1 GB
X = pd.concat([huge_dict,huge_dict,huge_dict,huge_dict],axis=0) #~roughly 4 GB
y = X.iloc[:,1]
#y = np.sum(np.arange(0, X.shape[1]) * X, axis=1)
print('global objects taken')
def main_func(X,y,i,pipe=None):
print("worker ", i)
y_pred = []
#mdl = GradientBoostingRegressor(n_estimators=1)
mdl = MLPRegressor()
mdl.fit(X,y)
#y_pred = mdl.predict(X)
time.sleep(100)
if pipe:
pipe.send(result)
else:
return y_pred
def run_normal_parallel(X,y):
workers = []
pipes = []
for i in range(2):
parent, child = Pipe()
worker = Process(target=main_func, args=(X,y,i,child))
pipes.append(parent)
worker.start()
result = []
for pipe in pipes:
result.extend(pipe.recv())
for worker in workers:
worker.terminate()
print result
#### main part #####
run_normal_parallel(X,y)
## Then, to kill this wild beast here in shell
#kill -9 $(ps -ef | grep gfasane | grep python | grep name_of_main_program | awk '{print$2}')