diff --git a/LR/development.ini.orig b/LR/development.ini.orig index 26e7a602..50dbd025 100755 --- a/LR/development.ini.orig +++ b/LR/development.ini.orig @@ -70,7 +70,10 @@ couchdb.db.resourcecount = _design/learningregistry-resources/_view/count couchdb.threshold.distributes = 1000 couchdb.threshold.viewupdate = 100 couchdb.stale.flag = update_after + lr.distribute.url = http://localhost/distribute + +lr.push_to.docid = distribute:Push To Distribution service lr.obtain.docid = access:Basic Obtain service lr.harvest.docid = access:Basic Harvest service lr.oaipmh.docid = access:OAI-PMH service @@ -83,6 +86,7 @@ lr.services.docid = administrative:Network Node Services service lr.policy.docid = administrative:Resource Distribution Network Policy service lr.status.docid = administrative:Network Node Status service lr.distribute.docid = distribute:Resource Data Distribution service + specs_dir = %(here)s/data/specs spec.models.node_description =%(specs_dir)s/v_0_23/models/node_description diff --git a/LR/lr/config/routing.py b/LR/lr/config/routing.py index 2b039a70..58616e0b 100755 --- a/LR/lr/config/routing.py +++ b/LR/lr/config/routing.py @@ -20,7 +20,7 @@ def make_map(config): always_scan=config['debug']) - def mapResource(config_key, member_name, collection_name): + def mapResource(config_key, member_name, collection_name, action=None): try: service_doc_id = config[config_key] service_doc = h.getServiceDocument(service_doc_id) @@ -29,8 +29,9 @@ def mapResource(config_key, member_name, collection_name): map.connect("/"+collection_name,controller=collection_name,action='options',conditions=dict(method=['OPTIONS'])) if member_name == 'swordservice': map.connect("/swordpub",controller='swordservice',action='create') - - if member_name == 'distribute': + elif member_name == 'push_to': + map.connect("/push_to", controller='distribute', action='push_to') + elif member_name == 'distribute': map.connect("/destination", controller='distribute', action='destination', conditions=dict(method='GET')) log.info("Enabling service route for: {0} member: {1} collection: {2}".format(service_doc_id, member_name, collection_name)) @@ -44,6 +45,7 @@ def mapResource(config_key, member_name, collection_name): path_prefix='/contrib', name_prefix='contrib_') mapResource('lr.status.docid', 'status','status') mapResource('lr.distribute.docid','distribute','distribute') + mapResource('lr.push_to.docid', 'push_to', 'distribute') if not LRNode.nodeDescription.gateway_node: mapResource('lr.publish.docid', 'publish','publish') mapResource('lr.obtain.docid', 'obtain','obtain') diff --git a/LR/lr/controllers/distribute.py b/LR/lr/controllers/distribute.py index 429e3b87..03cbdf63 100755 --- a/LR/lr/controllers/distribute.py +++ b/LR/lr/controllers/distribute.py @@ -56,7 +56,31 @@ def destination(self): log.info("received distribute request...returning: \n"+pprint.pformat(response, 4)) return json.dumps(response) + + def push_to(self): + + log.debug("\n\nPush request: {0}\n\n".format(pprint.pformat(request.body, indent=2))) + pushOptions = json.loads(request.body) + + replicationOptions={'filter':ResourceDataModel.REPLICATION_FILTER, + 'source':self.resource_data, + 'target': pushOptions['destination_database_url'], + 'query_params': pushOptions} + + req= urllib2.Request(urlparse.urljoin(appConfig['couchdb.url'], '_replicator'), + headers={'Content-Type':'application/json' }, + data = json.dumps(replicationOptions)) + + log.info("\n\nPush started\nSource:{0}\nDestionation:{1}\nArgs:{2}".format( + self.resource_data, pushOptions['destination_database_url'], + pprint.pformat(replicationOptions))) + + results = json.load(urllib2.urlopen(req)) + log.debug("Replication results: " + pprint.pformat(results)) + return json.dumps(results) + + def _getDistinationInfo(self, connection): # Make sure we only have one slash in the url path. More than one #confuses pylons routing libary. diff --git a/LR/lr/tests/functional/distribute/lr_node.py b/LR/lr/tests/functional/distribute/lr_node.py index 1259529c..5bddf77a 100644 --- a/LR/lr/tests/functional/distribute/lr_node.py +++ b/LR/lr/tests/functional/distribute/lr_node.py @@ -28,6 +28,7 @@ import urllib2 import urlparse from services.Resource_Data_Distribution import __ResourceDataDistributionServiceTemplate as DistributeServiceTemplate +from services.Push_To_Distribution import __PushToDistributionServiceTemplate as PushToServiceTemplate import subprocess from lr.lib import helpers as h from time import sleep @@ -67,6 +68,7 @@ def __init__(self, nodeConfig, nodeName, communityId=None, networkId=None): self._setupDescriptions() self._setupNode() self._setupDistributeService() + self._setupPushToService() self.setNodeInfo(nodeName) if communityId is not None: self.setCommunityInfo(communityId) @@ -75,7 +77,8 @@ def __init__(self, nodeConfig, nodeName, communityId=None, networkId=None): self.removeTestLog() # Keep around the replication documents that are store in the replication # database so that they can be deleted when the node is teared down. - self._distributeResultsList = [] + self._replicationIdList = [] + def _setupFilePaths(self): @@ -113,6 +116,9 @@ def removeTestLog(self): except: pass + def getResourceDataDbUrl(self): + return urlparse.urljoin(self._nodeConfig.get("couch_info", "server"), + self._nodeConfig.get("couch_info", "resourcedata")) def _setupNode(self): #create the couch db databases self._server = couchdb.Server(url=self._nodeConfig.get("couch_info", "server")) @@ -136,6 +142,20 @@ def _setupDistributeService(self): setup_utils.PublishDoc(self._server, self._nodeConfig.get("couch_info", "node") , doc["service_type"]+":Resource Data Distribution service", doc) + def _setupPushToService(self): + custom_opts = {} + custom_opts["node_endpoint"] = self._getNodeUrl() + custom_opts["service_id"] = uuid.uuid4().hex + custom_opts["active"] = True + + must = PushToServiceTemplate() + config_doc = must.render(**custom_opts) + + doc = json.loads(config_doc) + setup_utils.PublishDoc(self._server, self._nodeConfig.get("couch_info", "node") , + doc["service_type"]+":Push To Distribution service", doc) + + def _setupPylonsConfig(self): #Read the original configuration and update with the test node data. pylonsConfig = ConfigParser.ConfigParser() @@ -288,6 +308,19 @@ def waitOnChangeMonitor(self): except Exception as e: log.exception(e) print(e) + + def waitOnReplicationById(self, replicationId, destinationNode=None): + while(True): + response = urllib2.urlopen(self._replicatorUrl+'/'+replicationId) + doc = json.load(response) + response.close() + print('\n\n---------------Replication Status-----------') + print('--Replication:{1}\n<=From node:\t{0}\n=>To node:\t{2}\n<=>completion status: \t{3}\n'.format( + self._nodeName, replicationId, str(destinationNode), doc.get('_replication_state'))) + if doc.get('_replication_state') == 'completed': + break + sleep(30) + def waitOnReplication(self, distributeResults): """Wait for the replication to complete on the all node connections specifed @@ -298,30 +331,35 @@ def waitOnReplication(self, distributeResults): print ("node {0} has no replication results or distributable docs ....".format(self._nodeName)) return - waiting = True - while(waiting): - # Set waiting to false and check to the replication document to see - # if replication not completed for any of the connections, if not then - # reset waiting to true. - waiting =False - for connectionResults in distributeResults['connections']: - if 'replication_results' not in connectionResults: - continue - #Get the replication document. - response = urllib2.urlopen(self._replicatorUrl+'/'+connectionResults['replication_results']['id']) - doc = json.load(response) - response.close() - - print('\n\n---------------Replication Status-----------') - print('<=From node:\t{0}\n=>To node:\t{1}\n<=>completion status: \t{2}\n'.format( - self._nodeName, - connectionResults['destinationNodeInfo']['resource_data_url'].split('/')[-1].split('_resource_data')[0], - doc.get('_replication_state'))) - - if doc.get('_replication_state') != 'completed': - waiting = True - sleep(30) - continue + for connectionResults in distributeResults['connections']: + if 'replication_results' not in connectionResults: + continue + self._replicationIdList.append(connectionResults['replication_results']['id']) + self.waitOnReplicationById(connectionResults['replication_results']['id'], + connectionResults['destinationNodeInfo']['resource_data_url'].split('/')[-1].split('_resource_data')[0]) + + def pushTo(self, destinationNode, predicateFunc=None): + """Push to destination node. Use destination node filter if it has a filter""" + + pushToOptions = {"destination_database_url": destinationNode.getResourceDataDbUrl()} + + if (destinationNode._nodeFilterDescription is None) == False: + pushToOptions['filter'] = destinationNode._nodeFilterDescription["filter"] + pushToOptions['include_exclude'] =destinationNode._nodeFilterDescription["include_exclude"] + + if (predicateFunc is None) == False: + pushToOptions['predicate'] = predicateFunc + + request = urllib2.Request(self._getNodeUrl()+"/push_to", + headers={'Content-Type':'application/json' }, + data = json.dumps(pushToOptions)) + + results = json.load(urllib2.urlopen(request)) + self._replicationIdList.append(results['id']) + self.waitOnReplicationById(results['id'], destinationNode._nodeName) + print("Push to reponse: \n{0}".format(pprint.pformat(results))) + + return results def distribute(self, waitOnReplication=True): """ Distribute to all the node connections. When waited for completion is @@ -333,15 +371,16 @@ def distribute(self, waitOnReplication=True): data, {'Content-Type':'application/json; charset=utf-8'}) - self._distributeResultsList.append(json.load(urllib2.urlopen(request))) - print("Distribute reponse: \n{0}".format(pprint.pformat(self._distributeResultsList[-1]))) + results = json.load(urllib2.urlopen(request)) + print("Distribute reponse: \n{0}".format(pprint.pformat(results))) if waitOnReplication: - self.waitOnReplication(self._distributeResultsList[-1]) - - return self._distributeResultsList[-1] + self.waitOnReplication(results) + + return results - def getResourceDataDocs(self, filter_description=None, doc_type='resource_data', include_docs=True): + def getResourceDataDocs(self, filter_description=None, predicate=None, + doc_type='resource_data', include_docs=True): db = self._server[self._nodeConfig.get("couch_info", "resourcedata")] #For source node get all the resource_data documents using the filter @@ -351,17 +390,19 @@ def getResourceDataDocs(self, filter_description=None, doc_type='resource_data', "doc_type":doc_type} if filter_description is not None: options["filter_description"] = json.dumps(filter_description) + if predicate is not None: + options["predicate"] = predicate return db.changes(**options)["results"] - def compareDistributedResources(self, destination, filter_description=None): + def compareDistributedResources(self, destination, filter_description=None, predicate=None): """This method considers this node as source node. It compares its resource_data document with the destionation node to verify that data was distributed. This comparison assumes that distribute/ replication is done and that there is no other additions or deletions the nodes that are being compared""" - sourceResults = self.getResourceDataDocs(filter_description) + sourceResults = self.getResourceDataDocs(filter_description, predicate) destinationResults = destination.getResourceDataDocs() #check the number of source document is the same at destination. @@ -459,32 +500,28 @@ def removeReplicationDocs(self): replication document stated as completed with the same source and target database name eventhough those the document is about database thas been deleted and recreated. ''' - for distributeResults in self._distributeResultsList: - if ('connections' in distributeResults) == False: - continue - for connectionResults in distributeResults['connections']: - if 'replication_results' in connectionResults: - try: - #Use urllib request to remove the replication documents, the python - #couchdb interface has a bug at time of this write that prevents access - # to the _replicator database. - - #first get the lastest version of the doc - response = urllib2.urlopen(self._replicatorUrl+'/'+connectionResults['replication_results']['id']) - doc = json.load(response) - response.close() - print ("\n\n--node {0} deleting replication doc: {1}".format( - self._nodeName, - self._replicatorUrl+'/{0}?rev={1}'.format(doc['_id'], doc['_rev']))) - - request = urllib2.Request(self._replicatorUrl+'/{0}?rev={1}'.format(doc['_id'], doc['_rev']), - headers={'Content-Type':'application/json' }) + for replicationId in self._replicationIdList: + try: + #Use urllib request to remove the replication documents, the python + #couchdb interface has a bug at time of this write that prevents access + # to the _replicator database. + + #first get the lastest version of the doc + response = urllib2.urlopen(self._replicatorUrl+'/'+replicationId) + doc = json.load(response) + response.close() + print ("\n\n--node {0} deleting replication doc: {1}".format( + self._nodeName, + self._replicatorUrl+'/{0}?rev={1}'.format(doc['_id'], doc['_rev']))) + + request = urllib2.Request(self._replicatorUrl+'/{0}?rev={1}'.format(doc['_id'], doc['_rev']), + headers={'Content-Type':'application/json' }) + + request.get_method = lambda: "DELETE" - request.get_method = lambda: "DELETE" - - urllib2.urlopen(request) - except Exception as e: - log.exception(e) + urllib2.urlopen(request) + except Exception as e: + log.exception(e) def tearDown(self): self.stop() diff --git a/LR/lr/tests/functional/distribute/test_distribute.py b/LR/lr/tests/functional/distribute/test_distribute.py index 1856a1d1..ca9b8d19 100644 --- a/LR/lr/tests/functional/distribute/test_distribute.py +++ b/LR/lr/tests/functional/distribute/test_distribute.py @@ -14,6 +14,7 @@ import json from time import sleep import pprint +import urllib2 _PWD = path.abspath(path.dirname(__file__)) _TEST_DATA_PATH = path.abspath(path.join(_PWD, "../../data/nsdl_dc/data-000000000.json")) @@ -49,8 +50,7 @@ def setUp(self): for node in self._NODES: node.stop() node.resetResourceData() - - + def _setupNodePair(self, sourceNode, destinationNode, sourceCommunityId="DistributeTestCommunity", destinationCommunityId="DistributeTestCommunity", @@ -63,7 +63,8 @@ def _setupNodePair(self, sourceNode, destinationNode, sourceIsActive = True, destinationIsActive = True, destinationFilter = None, - isGatewayConnection=None): + isGatewayConnection=None, + addConnection = True): #Set the community id sourceNode.setCommunityInfo(sourceCommunityId, sourceIsSocialCommunity) destinationNode.setCommunityInfo(destinationCommunityId, destinationIsSocialCommunity) @@ -81,13 +82,22 @@ def _setupNodePair(self, sourceNode, destinationNode, destinationNode.setFilterInfo(**destinationFilter) #add the destination node as connection to the source node. - if isinstance(isGatewayConnection, bool): - sourceNode.addConnectionTo(destinationNode._getNodeUrl(), isGatewayConnection) - else: - sourceNode.addConnectionTo(destinationNode._getNodeUrl(), (sourceIsGateway or destinationIsGateway)) - + if addConnection: + if isinstance(isGatewayConnection, bool): + sourceNode.addConnectionTo(destinationNode._getNodeUrl(), isGatewayConnection) + else: + sourceNode.addConnectionTo(destinationNode._getNodeUrl(), (sourceIsGateway or destinationIsGateway)) + + def _doPushToTest(self, sourceNode, destinationNode, predicateFunc=None): + + sourceNode.start() + destinationNode.start() + results = sourceNode.pushTo(destinationNode, predicateFunc) + destinationNode.waitOnChangeMonitor() + return results + def _doDistributeTest(self, sourceNode, destinationNode): #start the node nodes. sourceNode.start() @@ -169,30 +179,44 @@ def test_gatewaynodes_on_same_communities_different_network(self): """Distribute between two gateway nodes on the same community but different network and no filter on the destination node.""" - - def _setup_common_nodes_same_network_and_community_filter(self, - include_exclude=True, - count=50, - mode=5): + def _pushMarkedData(self, sourceNode, count, mode): + #populate the node with test data. + data = json.load(file(_TEST_DATA_PATH)) + #Add some marker documents that will be filter in. + for i in range(count): + data["documents"][i]["keys"].append("Filter Me In") + if i % mode == 0: + data["documents"][i]["active"] = False + + sourceNode.publishResourceData(data["documents"]) + + def _setup_with_filter(self, include_exclude=True, count=50, mode=5, addConnection=True): + sourceNode =self._NODES[0] destinationNode = self._NODES[1] + filterDescription = {"include_exclude": include_exclude, "filter":[{"filter_key":"keys", "filter_value":"Filter Me In"}, {"filter_key":"active", "filter_value":"True"}, ]} - self._setupNodePair(sourceNode, destinationNode, destinationFilter=filterDescription) + self._setupNodePair(sourceNode, destinationNode, destinationFilter=filterDescription, + addConnection=addConnection) + + self._pushMarkedData(sourceNode, count, mode) + + + def _setup_common_nodes_same_network_and_community_filter(self, + include_exclude=True, + count=50, + mode=5, + addConnection=True): + + sourceNode =self._NODES[0] + destinationNode = self._NODES[1] - #populate the node with test data. - data = json.load(file(_TEST_DATA_PATH)) - #Add some marker documents that will be filter in. - for i in range(count): - data["documents"][i]["keys"].append("Filter Me In") - if i % mode == 0: - data["documents"][i]["active"] = False - - sourceNode.publishResourceData(data["documents"]) + self._setup_with_filter(include_exclude, count, mode, addConnection) response =self._doDistributeTest(sourceNode, destinationNode) @@ -332,4 +356,98 @@ def test_source_node_with_more_than_two_gateway_connections(self): """There should be NO distribution/replication. Source node connections are invalid""" + def test_push_distribute_no_filter_or_predicate(self): + """Test push to distribution with no filter or predicate function """ + sourceNode =self._NODES[0] + destinationNode = self._NODES[1] + + self._setupNodePair(sourceNode, destinationNode, addConnection=False) + + #populate the node with test data. + data = json.load(file(_TEST_DATA_PATH)) + sourceNode.publishResourceData(data["documents"]) + + response = self._doPushToTest(sourceNode, destinationNode) + assert (response[self.__OK]), "Replication failed..." + assert sourceNode.compareDistributedResources(destinationNode), \ + """Fail to push document to destination node.""" + + def test_push_distribute_with_filter_in(self): + """ Test push to distribution with filter in and no predicate function """ + + sourceNode =self._NODES[0] + destinationNode = self._NODES[1] + + self._setup_with_filter(addConnection=False) + response = self._doPushToTest(sourceNode, destinationNode) + + assert (response[self.__OK]), "Replication failed..." + assert sourceNode.compareDistributedResources(destinationNode, + destinationNode._nodeFilterDescription), \ + """Push to distribution with filter in failed ... """ + def test_push_distribute_with_filter_out(self): + """ Test push to distribution with filter out and no predicate function """ + + sourceNode =self._NODES[0] + destinationNode = self._NODES[1] + + self._setup_with_filter(addConnection=False, include_exclude=False, count=100, mode=2) + response = self._doPushToTest(sourceNode, destinationNode) + + assert (response[self.__OK]), "Replication failed..." + assert sourceNode.compareDistributedResources(destinationNode, + destinationNode._nodeFilterDescription), \ + """Push to distribution with filter out failed ... """ + + def test_push_distribute_with_predicate(self): + """ Test push to distribution with filter and predicate function """ + + sourceNode =self._NODES[0] + destinationNode = self._NODES[1] + self._setupNodePair(sourceNode, destinationNode, addConnection=False) + self._pushMarkedData(sourceNode, 50, 5) + + predicate = """function(doc){ + if(doc.active) + { + return true; + } + return false; + } + """ + destinationNode._nodeFilterDescription = None + response = self._doPushToTest(sourceNode, destinationNode, predicate) + + assert (response[self.__OK]), "Replication failed..." + assert sourceNode.compareDistributedResources(destinationNode, + destinationNode._nodeFilterDescription, predicate), \ + """Push to distribution with filter in failed ... """ + + def test_push_distribute_with_filter_and_predicate(self): + """ Test push to distribution with predicate function """ + + sourceNode =self._NODES[0] + destinationNode = self._NODES[1] + self._setupNodePair(sourceNode, destinationNode, addConnection=False) + self._pushMarkedData(sourceNode, 50, 5) + + destinationFilter = {"include_exclude": False, + "filter":[{"filter_key":"keys", "filter_value":"Filter Me In"}], + } + destinationNode.setFilterInfo(**destinationFilter ) + + predicate = """function(doc){ + if(doc.active) + { + return true; + } + return false; + } + """ + response = self._doPushToTest(sourceNode, destinationNode, predicate) + + assert (response[self.__OK]), "Replication failed..." + assert sourceNode.compareDistributedResources(destinationNode, + destinationNode._nodeFilterDescription, predicate), \ + """Push to distribution with filter and predicate failed ... """ diff --git a/LR/test.xml b/LR/test.xml deleted file mode 100644 index 212435db..00000000 --- a/LR/test.xml +++ /dev/null @@ -1,19 +0,0 @@ - - - - 1.3 - - Learning Registry - - Learning Registry - application/json - http://www.learningregistry.org/tos/ - Learning Registry - - - - diff --git a/config/services/Push_To_Distribution.py b/config/services/Push_To_Distribution.py new file mode 100644 index 00000000..84ea563f --- /dev/null +++ b/config/services/Push_To_Distribution.py @@ -0,0 +1,65 @@ +''' +Created on Nov 29, 2011 + +@author: jpoyau +''' +from service_template import ServiceTemplate +from setup_utils import getInput, PublishDoc, isBoolean, YES, isInt +import pystache, uuid +import json + + + +def install(server, dbname, setupInfo): + custom_opts = {} + active = getInput("Enable push to distribution service?", "N", isBoolean) + custom_opts["active"] = active.lower() in YES + + custom_opts["node_endpoint"] = setupInfo["nodeUrl"] + custom_opts["service_id"] = uuid.uuid4().hex + + must = __PushToDistributionServiceTemplate() + config_doc = must.render(**custom_opts) + print config_doc + doc = json.loads(config_doc) + PublishDoc(server, dbname,doc["service_type"]+":Push To Distribution service", doc) + print("Configured Push To distribute service:\n{0}\n".format(json.dumps(doc, indent=4, sort_keys=True))) + + + +class __PushToDistributionServiceTemplate(ServiceTemplate): + def __init__(self): + ServiceTemplate.__init__(self) + self.service_data_template = '''{}''' + + def _optsoverride(self): + opts = { + "active": "false", + "service_type": "distribute", + "service_name": "Push To Distribute", + "service_version": "0.23.0", + "service_endpoint": "/push_to", + "service_key": "false", + "service_https": "false", + } + return opts + +if __name__ == "__main__": + import couchdb + + nodeSetup = { + 'couchDBUrl': "http://localhost:5984", + 'nodeUrl': "http://test.example.com" + } + + def doesNotEndInSlash(input=None): + return input is not None and input[-1] != "/" + + def notExample(input=None): + return input is not None and input != nodeSetup["nodeUrl"] + + nodeSetup["couchDBUrl"] = getInput("Enter the CouchDB URL:", nodeSetup["couchDBUrl"], doesNotEndInSlash) + nodeSetup["nodeUrl"] = getInput("Enter the public URL of the LR Node", nodeSetup["nodeUrl"], notExample) + + server = couchdb.Server(url= nodeSetup['couchDBUrl']) + install(server, "node", nodeSetup) diff --git a/config/setup_node.py b/config/setup_node.py index a5ea8df1..15f6aba6 100755 --- a/config/setup_node.py +++ b/config/setup_node.py @@ -58,7 +58,9 @@ "SWORD APP Publish V1.3"], "broker":[], "distribute": - ["Resource Data Distribution"], + ["Resource Data Distribution", + "Push To Distribution", + ], "publish": ["Basic Publish"]} @@ -72,7 +74,9 @@ "Network Node Status", "Resource Distribution Network Policy"], "distribute": - ["Resource Data Distribution"] + ["Resource Data Distribution", + "Push To Distribution" + ] } def makePythonic(text): return re.sub('''[ \.]''', "_", text) diff --git a/couchdb/resource_data/apps/filtered-replication/filters/change_feed_filter.js b/couchdb/resource_data/apps/filtered-replication/filters/change_feed_filter.js index 7bb87177..de017e23 100644 --- a/couchdb/resource_data/apps/filtered-replication/filters/change_feed_filter.js +++ b/couchdb/resource_data/apps/filtered-replication/filters/change_feed_filter.js @@ -16,14 +16,43 @@ function(doc, req){ } // If there is no filter in the parameter just send the document. - if (!req.query.filter_description){ - log("No filter available..."); + if (!req.query.filter_description && !req.query.predicate){ + log("No filter or predicate available..."); return true; } - filter_description = JSON.parse(req.query.filter_description); + + + //check if there is filter predicate function. + var predicate_test = true; + if (req.query.predicate){ + try{ + predicate_func = eval(req.query.predicate); + predicate_test = predicate_func(doc); + } + catch (err){ + log("predicate fuction error"+err); + predicate_test = false; + } + } + if (predicate_test == false) + { + log("doc failed predicate test ... returning false") + return false; + } + + var filter_description; + if (req.query.filter_description){ + filter_description = JSON.parse(req.query.filter_description); + } + else + { + log("no filter ... returning predicate result"+predicate_test) + return predicate_test + } // Check to see the query parameter is valid node filter description. if not // we can filter anything out so send it. - if(("custom_filter" in filter_description) && filter_description.custom_filter == true){ + if(("custom_filter" in filter_description) && filter_description.custom_filter == true && + predicate_test == true){ log("No filtering needed custom filter is being used"); return true; } @@ -90,6 +119,11 @@ function(doc, req){ log("rejecting document because it matches exclude filter...\n") return false; } + if (predicate_test == false) + { + log("rejecting document because it failed predicate test ...\n"); + return false; + } log("The document just match everything...\n"); return true; } diff --git a/couchdb/resource_data/apps/filtered-replication/filters/replication_filter.js b/couchdb/resource_data/apps/filtered-replication/filters/replication_filter.js index c3fa6569..2b23ecd5 100644 --- a/couchdb/resource_data/apps/filtered-replication/filters/replication_filter.js +++ b/couchdb/resource_data/apps/filtered-replication/filters/replication_filter.js @@ -1,7 +1,7 @@ function(doc, req){ // Don't send the design document. if ( !doc || (doc.doc_type != "resource_data_distributable")){ - //print("Ignore document that is not resource_data_distributable: "+ doc); + //log("Ignore document that is not resource_data_distributable: "+ doc); return false; } //Check if the document say has does distribute field. if so don't send it @@ -10,27 +10,40 @@ function(doc, req){ } // If there is no query parameters return send the document. if (!req){ - //print("no req value"); + //log("no req value"); return true; } // Check to see the query parameter is valid node filter description. if not // we can filter anything out so send it. if(("custom_filter" in req) && req.custom_filter == true){ - //print("No filtering needed custom filter is being used"); + //log("No filtering needed custom filter is being used"); return true; } // If there is no filter in the parameter just send the document. var filter_doc = req.query; if (!filter_doc){ - //print("No filter available..."); + //log("No filter available..."); return true; } + //check if there is filter predicate function. + var predicate_test = true; + if ('predicate' in filter_doc){ + try{ + predicate_func = eval(filter_doc.predicate); + predicate_test = predicate_func(doc); + } + catch (err){ + log("predicate fuction error"+err); + predicate_test = false; + } + } + // Variable to hold if the document is filter out on match. // If include_exclude is T if the filters describe what documents to accept // all others are rejected F if the filters describe what documents to reject // all others are accepted optional, T if not present var include_doc = (!("include_exclude" in filter_doc) ||filter_doc.include_exclude==true); - //print("Include doc on match: "+include_doc); + //log("Include doc on match: "+include_doc); //Keep track the see if the document match all the filters. var match_all_filters = true; @@ -43,7 +56,7 @@ function(doc, req){ var filter_match = null; if (!filter_key || !filter_value){ - //print("Continuing the key/value loop... invalid filter_key: "+filter_key+" or filter_value: "+filter_value ); + //log("Continuing the key/value loop... invalid filter_key: "+filter_key+" or filter_value: "+filter_value ); continue; } //create a regular expression for the filter_key to check for matching against @@ -53,39 +66,43 @@ function(doc, req){ //Create regular expression for the key value var regex_value = new RegExp(filter_value); - //print("Filter key: "+filter_key+"\tFilter value: "+filter_value+"\t key regx: "+regex_key); + //log("Filter key: "+filter_key+"\tFilter value: "+filter_value+"\t key regx: "+regex_key); //Look though the keys and for the variable that matches the filter key for (var key in doc){ - //print("key: "+key); + //log("key: "+key); if (!key.match(regex_key)){ continue; } matched_value = doc[key]; - //print("find a match: '"+matched_value+"' for filter key: "+filter_key); + //log("find a match: '"+matched_value+"' for filter key: "+filter_key); //Make we have a valid data for filter_value and matched value // otherwise keep looping. if (!matched_value){ - //print("The matched_value is bad: '"+matched_value +"'...keep going\n"); + //log("The matched_value is bad: '"+matched_value +"'...keep going\n"); continue; } //Check if there is match filter_match = JSON.stringify(matched_value).match(regex_value); match_all_filters = match_all_filters && (filter_match != null); - //print("The match between '"+regex_value+"' and '"+matched_value+" is: "+filter_match+"\n"); + //log("The match between '"+regex_value+"' and '"+matched_value+" is: "+filter_match+"\n"); //Exclude the document if there is a match if((include_doc == true && !filter_match )){ - //print("There is no match rejecting document..."); + //log("There is no match rejecting document..."); return false; } } } - if( (include_doc == false) &&(match_all_filters == true)) + if((include_doc == false) &&(match_all_filters == true)) { - //print("rejecting document because it matches exclude filter...\n") + //log("rejecting document because it matches exclude filter...\n") + return false; + } + if(predicate_test == false){ + //log("rejecting document because it failed predicate test ...\n"); return false; } - //print("The document just match everything...\n"); + //log("The document just match everything...\n"); return true; }