Skip to content

Push replication #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: data-services-proto
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions LR/development.ini.orig
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ couchdb.db.resourcecount = _design/learningregistry-resources/_view/count
couchdb.threshold.distributes = 1000
couchdb.threshold.viewupdate = 100
couchdb.stale.flag = update_after

lr.distribute.url = http://localhost/distribute

lr.push_to.docid = distribute:Push To Distribution service
lr.obtain.docid = access:Basic Obtain service
lr.harvest.docid = access:Basic Harvest service
lr.oaipmh.docid = access:OAI-PMH service
Expand All @@ -83,6 +86,7 @@ lr.services.docid = administrative:Network Node Services service
lr.policy.docid = administrative:Resource Distribution Network Policy service
lr.status.docid = administrative:Network Node Status service
lr.distribute.docid = distribute:Resource Data Distribution service

specs_dir = %(here)s/data/specs
spec.models.node_description =%(specs_dir)s/v_0_23/models/node_description

Expand Down
8 changes: 5 additions & 3 deletions LR/lr/config/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def make_map(config):
always_scan=config['debug'])


def mapResource(config_key, member_name, collection_name):
def mapResource(config_key, member_name, collection_name, action=None):
try:
service_doc_id = config[config_key]
service_doc = h.getServiceDocument(service_doc_id)
Expand All @@ -29,8 +29,9 @@ def mapResource(config_key, member_name, collection_name):
map.connect("/"+collection_name,controller=collection_name,action='options',conditions=dict(method=['OPTIONS']))
if member_name == 'swordservice':
map.connect("/swordpub",controller='swordservice',action='create')

if member_name == 'distribute':
elif member_name == 'push_to':
map.connect("/push_to", controller='distribute', action='push_to')
elif member_name == 'distribute':
map.connect("/destination", controller='distribute', action='destination',
conditions=dict(method='GET'))
log.info("Enabling service route for: {0} member: {1} collection: {2}".format(service_doc_id, member_name, collection_name))
Expand All @@ -44,6 +45,7 @@ def mapResource(config_key, member_name, collection_name):
path_prefix='/contrib', name_prefix='contrib_')
mapResource('lr.status.docid', 'status','status')
mapResource('lr.distribute.docid','distribute','distribute')
mapResource('lr.push_to.docid', 'push_to', 'distribute')
if not LRNode.nodeDescription.gateway_node:
mapResource('lr.publish.docid', 'publish','publish')
mapResource('lr.obtain.docid', 'obtain','obtain')
Expand Down
24 changes: 24 additions & 0 deletions LR/lr/controllers/distribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,31 @@ def destination(self):

log.info("received distribute request...returning: \n"+pprint.pformat(response, 4))
return json.dumps(response)

def push_to(self):

log.debug("\n\nPush request: {0}\n\n".format(pprint.pformat(request.body, indent=2)))
pushOptions = json.loads(request.body)

replicationOptions={'filter':ResourceDataModel.REPLICATION_FILTER,
'source':self.resource_data,
'target': pushOptions['destination_database_url'],
'query_params': pushOptions}

req= urllib2.Request(urlparse.urljoin(appConfig['couchdb.url'], '_replicator'),
headers={'Content-Type':'application/json' },
data = json.dumps(replicationOptions))

log.info("\n\nPush started\nSource:{0}\nDestionation:{1}\nArgs:{2}".format(
self.resource_data, pushOptions['destination_database_url'],
pprint.pformat(replicationOptions)))

results = json.load(urllib2.urlopen(req))
log.debug("Replication results: " + pprint.pformat(results))

return json.dumps(results)


def _getDistinationInfo(self, connection):
# Make sure we only have one slash in the url path. More than one
#confuses pylons routing libary.
Expand Down
153 changes: 95 additions & 58 deletions LR/lr/tests/functional/distribute/lr_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import urllib2
import urlparse
from services.Resource_Data_Distribution import __ResourceDataDistributionServiceTemplate as DistributeServiceTemplate
from services.Push_To_Distribution import __PushToDistributionServiceTemplate as PushToServiceTemplate
import subprocess
from lr.lib import helpers as h
from time import sleep
Expand Down Expand Up @@ -67,6 +68,7 @@ def __init__(self, nodeConfig, nodeName, communityId=None, networkId=None):
self._setupDescriptions()
self._setupNode()
self._setupDistributeService()
self._setupPushToService()
self.setNodeInfo(nodeName)
if communityId is not None:
self.setCommunityInfo(communityId)
Expand All @@ -75,7 +77,8 @@ def __init__(self, nodeConfig, nodeName, communityId=None, networkId=None):
self.removeTestLog()
# Keep around the replication documents that are store in the replication
# database so that they can be deleted when the node is teared down.
self._distributeResultsList = []
self._replicationIdList = []


def _setupFilePaths(self):

Expand Down Expand Up @@ -113,6 +116,9 @@ def removeTestLog(self):
except:
pass

def getResourceDataDbUrl(self):
return urlparse.urljoin(self._nodeConfig.get("couch_info", "server"),
self._nodeConfig.get("couch_info", "resourcedata"))
def _setupNode(self):
#create the couch db databases
self._server = couchdb.Server(url=self._nodeConfig.get("couch_info", "server"))
Expand All @@ -136,6 +142,20 @@ def _setupDistributeService(self):
setup_utils.PublishDoc(self._server, self._nodeConfig.get("couch_info", "node") ,
doc["service_type"]+":Resource Data Distribution service", doc)

def _setupPushToService(self):
custom_opts = {}
custom_opts["node_endpoint"] = self._getNodeUrl()
custom_opts["service_id"] = uuid.uuid4().hex
custom_opts["active"] = True

must = PushToServiceTemplate()
config_doc = must.render(**custom_opts)

doc = json.loads(config_doc)
setup_utils.PublishDoc(self._server, self._nodeConfig.get("couch_info", "node") ,
doc["service_type"]+":Push To Distribution service", doc)


def _setupPylonsConfig(self):
#Read the original configuration and update with the test node data.
pylonsConfig = ConfigParser.ConfigParser()
Expand Down Expand Up @@ -288,6 +308,19 @@ def waitOnChangeMonitor(self):
except Exception as e:
log.exception(e)
print(e)

def waitOnReplicationById(self, replicationId, destinationNode=None):
while(True):
response = urllib2.urlopen(self._replicatorUrl+'/'+replicationId)
doc = json.load(response)
response.close()
print('\n\n---------------Replication Status-----------')
print('--Replication:{1}\n<=From node:\t{0}\n=>To node:\t{2}\n<=>completion status: \t{3}\n'.format(
self._nodeName, replicationId, str(destinationNode), doc.get('_replication_state')))
if doc.get('_replication_state') == 'completed':
break
sleep(30)


def waitOnReplication(self, distributeResults):
"""Wait for the replication to complete on the all node connections specifed
Expand All @@ -298,30 +331,35 @@ def waitOnReplication(self, distributeResults):
print ("node {0} has no replication results or distributable docs ....".format(self._nodeName))
return

waiting = True
while(waiting):
# Set waiting to false and check to the replication document to see
# if replication not completed for any of the connections, if not then
# reset waiting to true.
waiting =False
for connectionResults in distributeResults['connections']:
if 'replication_results' not in connectionResults:
continue
#Get the replication document.
response = urllib2.urlopen(self._replicatorUrl+'/'+connectionResults['replication_results']['id'])
doc = json.load(response)
response.close()

print('\n\n---------------Replication Status-----------')
print('<=From node:\t{0}\n=>To node:\t{1}\n<=>completion status: \t{2}\n'.format(
self._nodeName,
connectionResults['destinationNodeInfo']['resource_data_url'].split('/')[-1].split('_resource_data')[0],
doc.get('_replication_state')))

if doc.get('_replication_state') != 'completed':
waiting = True
sleep(30)
continue
for connectionResults in distributeResults['connections']:
if 'replication_results' not in connectionResults:
continue
self._replicationIdList.append(connectionResults['replication_results']['id'])
self.waitOnReplicationById(connectionResults['replication_results']['id'],
connectionResults['destinationNodeInfo']['resource_data_url'].split('/')[-1].split('_resource_data')[0])

def pushTo(self, destinationNode, predicateFunc=None):
"""Push to destination node. Use destination node filter if it has a filter"""

pushToOptions = {"destination_database_url": destinationNode.getResourceDataDbUrl()}

if (destinationNode._nodeFilterDescription is None) == False:
pushToOptions['filter'] = destinationNode._nodeFilterDescription["filter"]
pushToOptions['include_exclude'] =destinationNode._nodeFilterDescription["include_exclude"]

if (predicateFunc is None) == False:
pushToOptions['predicate'] = predicateFunc

request = urllib2.Request(self._getNodeUrl()+"/push_to",
headers={'Content-Type':'application/json' },
data = json.dumps(pushToOptions))

results = json.load(urllib2.urlopen(request))
self._replicationIdList.append(results['id'])
self.waitOnReplicationById(results['id'], destinationNode._nodeName)
print("Push to reponse: \n{0}".format(pprint.pformat(results)))

return results

def distribute(self, waitOnReplication=True):
""" Distribute to all the node connections. When waited for completion is
Expand All @@ -333,15 +371,16 @@ def distribute(self, waitOnReplication=True):
data,
{'Content-Type':'application/json; charset=utf-8'})

self._distributeResultsList.append(json.load(urllib2.urlopen(request)))
print("Distribute reponse: \n{0}".format(pprint.pformat(self._distributeResultsList[-1])))
results = json.load(urllib2.urlopen(request))
print("Distribute reponse: \n{0}".format(pprint.pformat(results)))

if waitOnReplication:
self.waitOnReplication(self._distributeResultsList[-1])
return self._distributeResultsList[-1]
self.waitOnReplication(results)

return results

def getResourceDataDocs(self, filter_description=None, doc_type='resource_data', include_docs=True):
def getResourceDataDocs(self, filter_description=None, predicate=None,
doc_type='resource_data', include_docs=True):

db = self._server[self._nodeConfig.get("couch_info", "resourcedata")]
#For source node get all the resource_data documents using the filter
Expand All @@ -351,17 +390,19 @@ def getResourceDataDocs(self, filter_description=None, doc_type='resource_data',
"doc_type":doc_type}
if filter_description is not None:
options["filter_description"] = json.dumps(filter_description)
if predicate is not None:
options["predicate"] = predicate
return db.changes(**options)["results"]


def compareDistributedResources(self, destination, filter_description=None):
def compareDistributedResources(self, destination, filter_description=None, predicate=None):
"""This method considers this node as source node.
It compares its resource_data document with the destionation node to
verify that data was distributed. This comparison assumes that distribute/
replication is done and that there is no other additions or deletions the
nodes that are being compared"""

sourceResults = self.getResourceDataDocs(filter_description)
sourceResults = self.getResourceDataDocs(filter_description, predicate)
destinationResults = destination.getResourceDataDocs()

#check the number of source document is the same at destination.
Expand Down Expand Up @@ -459,32 +500,28 @@ def removeReplicationDocs(self):
replication document stated as completed with the same source and target
database name eventhough those the document is about database thas been
deleted and recreated. '''
for distributeResults in self._distributeResultsList:
if ('connections' in distributeResults) == False:
continue
for connectionResults in distributeResults['connections']:
if 'replication_results' in connectionResults:
try:
#Use urllib request to remove the replication documents, the python
#couchdb interface has a bug at time of this write that prevents access
# to the _replicator database.

#first get the lastest version of the doc
response = urllib2.urlopen(self._replicatorUrl+'/'+connectionResults['replication_results']['id'])
doc = json.load(response)
response.close()
print ("\n\n--node {0} deleting replication doc: {1}".format(
self._nodeName,
self._replicatorUrl+'/{0}?rev={1}'.format(doc['_id'], doc['_rev'])))

request = urllib2.Request(self._replicatorUrl+'/{0}?rev={1}'.format(doc['_id'], doc['_rev']),
headers={'Content-Type':'application/json' })
for replicationId in self._replicationIdList:
try:
#Use urllib request to remove the replication documents, the python
#couchdb interface has a bug at time of this write that prevents access
# to the _replicator database.

#first get the lastest version of the doc
response = urllib2.urlopen(self._replicatorUrl+'/'+replicationId)
doc = json.load(response)
response.close()
print ("\n\n--node {0} deleting replication doc: {1}".format(
self._nodeName,
self._replicatorUrl+'/{0}?rev={1}'.format(doc['_id'], doc['_rev'])))

request = urllib2.Request(self._replicatorUrl+'/{0}?rev={1}'.format(doc['_id'], doc['_rev']),
headers={'Content-Type':'application/json' })

request.get_method = lambda: "DELETE"

request.get_method = lambda: "DELETE"

urllib2.urlopen(request)
except Exception as e:
log.exception(e)
urllib2.urlopen(request)
except Exception as e:
log.exception(e)

def tearDown(self):
self.stop()
Expand Down
Loading