Skip to content

Commit 526cf08

Browse files
authored
Stale removal via timestamp (#235)
* Neo4jStalenessRemovalTask to support removal via timestamp * Flake8 * Update * More unit test * Update * Flake 8 * Update * Add doc
1 parent 2ee56a6 commit 526cf08

File tree

4 files changed

+468
-44
lines changed

4 files changed

+468
-44
lines changed

README.md

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,3 +445,66 @@ With this pattern RestApiQuery supports 1:1 and 1:N JOIN relationship.
445445
(GROUP BY or any other aggregation, sub-query join is not supported)
446446

447447
To see in action, take a peek at [ModeDashboardExtractor](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/extractor/dashboard/mode_dashboard_extractor.py)
448+
449+
450+
### Removing stale data in Neo4j -- [Neo4jStalenessRemovalTask](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/task/neo4j_staleness_removal_task.py):
451+
452+
As Databuilder ingestion mostly consists of either INSERT OR UPDATE, there could be some stale data that has been removed from metadata source but still remains in Neo4j database. Neo4jStalenessRemovalTask basically detects staleness and removes it.
453+
454+
In [Neo4jCsvPublisher](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/publisher/neo4j_csv_publisher.py), it adds attributes "published_tag" and "publisher_last_updated_epoch_ms" on every nodes and relations. You can use either of these two attributes to detect staleness and remove those stale node or relation from the database.
455+
456+
#### Using "published_tag" to remove stale data
457+
Use *published_tag* to remove stale data, when it is certain that non-matching tag is stale once all the ingestion is completed. For example, suppose that you use current date (or execution date in Airflow) as a *published_tag*, "2020-03-31". Once Databuilder ingests all tables and all columns, all table nodes and column nodes should have *published_tag* as "2020-03-31". It is safe to assume that table nodes and column nodes whose *published_tag* is different -- such as "2020-03-30" or "2020-02-10" -- means that it is deleted from the source metadata. You can use Neo4jStalenessRemovalTask to delete those stale data.
458+
459+
task = Neo4jStalenessRemovalTask()
460+
job_config_dict = {
461+
'job.identifier': 'remove_stale_data_job',
462+
'task.remove_stale_data.neo4j_endpoint': neo4j_endpoint,
463+
'task.remove_stale_data.neo4j_user': neo4j_user,
464+
'task.remove_stale_data.neo4j_password': neo4j_password,
465+
'task.remove_stale_data.staleness_max_pct': 10,
466+
'task.remove_stale_data.target_nodes': ['Table', 'Column'],
467+
'task.remove_stale_data.job_publish_tag': '2020-03-31'
468+
}
469+
job_config = ConfigFactory.from_dict(job_config_dict)
470+
job = DefaultJob(conf=job_config, task=task)
471+
job.launch()
472+
473+
Note that there's protection mechanism, **staleness_max_pct**, that protect your data being wiped out when something is clearly wrong. "**staleness_max_pct**" basically first measure the proportion of elements that will be deleted and if it exceeds threshold per type ( 10% on the configuration above ), the deletion won't be executed and the task aborts.
474+
475+
#### Using "publisher_last_updated_epoch_ms" to remove stale data
476+
You can think this approach as TTL based eviction. This is particularly useful when there are multiple ingestion pipelines and you cannot be sure when all ingestion is done. In this case, you might still can say that if specific node or relation has not been published past 3 days, it's stale data.
477+
478+
task = Neo4jStalenessRemovalTask()
479+
job_config_dict = {
480+
'job.identifier': 'remove_stale_data_job',
481+
'task.remove_stale_data.neo4j_endpoint': neo4j_endpoint,
482+
'task.remove_stale_data.neo4j_user': neo4j_user,
483+
'task.remove_stale_data.neo4j_password': neo4j_password,
484+
'task.remove_stale_data.staleness_max_pct': 10,
485+
'task.remove_stale_data.target_relations': ['READ', 'READ_BY'],
486+
'task.remove_stale_data.milliseconds_to_expire': 86400000 * 3
487+
}
488+
job_config = ConfigFactory.from_dict(job_config_dict)
489+
job = DefaultJob(conf=job_config, task=task)
490+
job.launch()
491+
492+
Above configuration is trying to delete stale usage relation (READ, READ_BY), by deleting READ or READ_BY relation that has not been published past 3 days. If number of elements to be removed is more than 10% per type, this task will be aborted without executing any deletion.
493+
494+
#### Dry run
495+
Deletion is always scary and it's better to perform dryrun before put this into action. You can use Dry run to see what sort of Cypher query will be executed.
496+
497+
task = Neo4jStalenessRemovalTask()
498+
job_config_dict = {
499+
'job.identifier': 'remove_stale_data_job',
500+
'task.remove_stale_data.neo4j_endpoint': neo4j_endpoint,
501+
'task.remove_stale_data.neo4j_user': neo4j_user,
502+
'task.remove_stale_data.neo4j_password': neo4j_password,
503+
'task.remove_stale_data.staleness_max_pct': 10,
504+
'task.remove_stale_data.target_relations': ['READ', 'READ_BY'],
505+
'task.remove_stale_data.milliseconds_to_expire': 86400000 * 3
506+
'task.remove_stale_data.dry_run': True
507+
}
508+
job_config = ConfigFactory.from_dict(job_config_dict)
509+
job = DefaultJob(conf=job_config, task=task)
510+
job.launch()

databuilder/task/neo4j_staleness_removal_task.py

Lines changed: 84 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
import logging
2+
import textwrap
23
import time
34

4-
from neo4j.v1 import GraphDatabase, BoltStatementResult # noqa: F401
5+
from neo4j import GraphDatabase # noqa: F401
56
from pyhocon import ConfigFactory # noqa: F401
67
from pyhocon import ConfigTree # noqa: F401
78
from typing import Dict, Iterable, Any # noqa: F401
89

910
from databuilder import Scoped
10-
from databuilder.task.base_task import Task # noqa: F401
1111
from databuilder.publisher.neo4j_csv_publisher import JOB_PUBLISH_TAG
12-
12+
from databuilder.task.base_task import Task # noqa: F401
1313

1414
# A end point for Neo4j e.g: bolt://localhost:9999
1515
NEO4J_END_POINT_KEY = 'neo4j_endpoint'
@@ -20,20 +20,28 @@
2020
TARGET_NODES = "target_nodes"
2121
TARGET_RELATIONS = "target_relations"
2222
BATCH_SIZE = "batch_size"
23+
DRY_RUN = "dry_run"
2324
# Staleness max percentage. Safety net to prevent majority of data being deleted.
2425
STALENESS_MAX_PCT = "staleness_max_pct"
2526
# Staleness max percentage per LABEL/TYPE. Safety net to prevent majority of data being deleted.
2627
STALENESS_PCT_MAX_DICT = "staleness_max_pct_dict"
28+
# Using this milliseconds and published timestamp to determine staleness
29+
MS_TO_EXPIRE = "milliseconds_to_expire"
30+
MIN_MS_TO_EXPIRE = "minimum_milliseconds_to_expire"
2731

2832
DEFAULT_CONFIG = ConfigFactory.from_dict({BATCH_SIZE: 100,
2933
NEO4J_MAX_CONN_LIFE_TIME_SEC: 50,
3034
STALENESS_MAX_PCT: 5,
3135
TARGET_NODES: [],
3236
TARGET_RELATIONS: [],
33-
STALENESS_PCT_MAX_DICT: {}})
37+
STALENESS_PCT_MAX_DICT: {},
38+
MIN_MS_TO_EXPIRE: 86400000,
39+
DRY_RUN: False})
3440

3541
LOGGER = logging.getLogger(__name__)
3642

43+
MARKER_VAR_NAME = 'marker'
44+
3745

3846
class Neo4jStalenessRemovalTask(Task):
3947
"""
@@ -55,22 +63,33 @@ def get_scope(self):
5563

5664
def init(self, conf):
5765
# type: (ConfigTree) -> None
58-
conf = Scoped.get_scoped_conf(conf, self.get_scope())\
59-
.with_fallback(conf)\
66+
conf = Scoped.get_scoped_conf(conf, self.get_scope()) \
67+
.with_fallback(conf) \
6068
.with_fallback(DEFAULT_CONFIG)
6169
self.target_nodes = set(conf.get_list(TARGET_NODES))
6270
self.target_relations = set(conf.get_list(TARGET_RELATIONS))
6371
self.batch_size = conf.get_int(BATCH_SIZE)
72+
self.dry_run = conf.get_bool(DRY_RUN)
6473
self.staleness_pct = conf.get_int(STALENESS_MAX_PCT)
6574
self.staleness_pct_dict = conf.get(STALENESS_PCT_MAX_DICT)
66-
self.publish_tag = conf.get_string(JOB_PUBLISH_TAG)
75+
76+
if JOB_PUBLISH_TAG in conf and MS_TO_EXPIRE in conf:
77+
raise Exception('Cannot have both {} and {} in job config'.format(JOB_PUBLISH_TAG, MS_TO_EXPIRE))
78+
79+
self.ms_to_expire = None
80+
if MS_TO_EXPIRE in conf:
81+
self.ms_to_expire = conf.get_int(MS_TO_EXPIRE)
82+
if self.ms_to_expire < conf.get_int(MIN_MS_TO_EXPIRE):
83+
raise Exception('{} is too small'.format(MS_TO_EXPIRE))
84+
self.marker = '(timestamp() - {})'.format(conf.get_int(MS_TO_EXPIRE))
85+
else:
86+
self.marker = conf.get_string(JOB_PUBLISH_TAG)
87+
6788
self._driver = \
6889
GraphDatabase.driver(conf.get_string(NEO4J_END_POINT_KEY),
6990
max_connection_life_time=conf.get_int(NEO4J_MAX_CONN_LIFE_TIME_SEC),
7091
auth=(conf.get_string(NEO4J_USER), conf.get_string(NEO4J_PASSWORD)))
7192

72-
self._session = self._driver.session()
73-
7493
def run(self):
7594
# type: () -> None
7695
"""
@@ -94,26 +113,39 @@ def validate(self):
94113
self._validate_relation_staleness_pct()
95114

96115
def _delete_stale_nodes(self):
97-
statement = """
98-
MATCH (n:{type})
99-
WHERE n.published_tag <> $published_tag
100-
OR NOT EXISTS(n.published_tag)
116+
statement = textwrap.dedent("""
117+
MATCH (n:{{type}})
118+
WHERE {}
101119
WITH n LIMIT $batch_size
102120
DETACH DELETE (n)
103121
RETURN COUNT(*) as count;
122+
""")
123+
self._batch_delete(statement=self._decorate_staleness(statement), targets=self.target_nodes)
124+
125+
def _decorate_staleness(self, statement):
126+
"""
127+
Append where clause to the Cypher statement depends on which field to be used to expire stale data.
128+
:param statement:
129+
:return:
104130
"""
105-
self._batch_delete(statement=statement, targets=self.target_nodes)
131+
if self.ms_to_expire:
132+
return statement.format(textwrap.dedent("""
133+
n.publisher_last_updated_epoch_ms < ${marker}
134+
OR NOT EXISTS(n.publisher_last_updated_epoch_ms)""".format(marker=MARKER_VAR_NAME)))
135+
136+
return statement.format(textwrap.dedent("""
137+
n.published_tag <> ${marker}
138+
OR NOT EXISTS(n.published_tag)""".format(marker=MARKER_VAR_NAME)))
106139

107140
def _delete_stale_relations(self):
108-
statement = """
109-
MATCH ()-[r:{type}]-()
110-
WHERE r.published_tag <> $published_tag
111-
OR NOT EXISTS(r.published_tag)
112-
WITH r LIMIT $batch_size
113-
DELETE r
141+
statement = textwrap.dedent("""
142+
MATCH ()-[n:{{type}}]-()
143+
WHERE {}
144+
WITH n LIMIT $batch_size
145+
DELETE n
114146
RETURN count(*) as count;
115-
"""
116-
self._batch_delete(statement=statement, targets=self.target_relations)
147+
""")
148+
self._batch_delete(statement=self._decorate_staleness(statement), targets=self.target_relations)
117149

118150
def _batch_delete(self, statement, targets):
119151
"""
@@ -126,10 +158,12 @@ def _batch_delete(self, statement, targets):
126158
LOGGER.info('Deleting stale data of {} with batch size {}'.format(t, self.batch_size))
127159
total_count = 0
128160
while True:
129-
result = self._execute_cypher_query(statement=statement.format(type=t),
130-
param_dict={'batch_size': self.batch_size,
131-
'published_tag': self.publish_tag}).single()
132-
count = result['count']
161+
results = self._execute_cypher_query(statement=statement.format(type=t),
162+
param_dict={'batch_size': self.batch_size,
163+
MARKER_VAR_NAME: self.marker},
164+
dry_run=self.dry_run)
165+
record = next(iter(results), None)
166+
count = record['count'] if record else 0
133167
total_count = total_count + count
134168
if count == 0:
135169
break
@@ -160,52 +194,59 @@ def _validate_staleness_pct(self, total_records, stale_records, types):
160194
def _validate_node_staleness_pct(self):
161195
# type: () -> None
162196

163-
total_nodes_statement = """
197+
total_nodes_statement = textwrap.dedent("""
164198
MATCH (n)
165199
WITH DISTINCT labels(n) as node, count(*) as count
166200
RETURN head(node) as type, count
167-
"""
201+
""")
168202

169-
stale_nodes_statement = """
203+
stale_nodes_statement = textwrap.dedent("""
170204
MATCH (n)
171-
WHERE n.published_tag <> $published_tag
172-
OR NOT EXISTS(n.published_tag)
205+
WHERE {}
173206
WITH DISTINCT labels(n) as node, count(*) as count
174207
RETURN head(node) as type, count
175-
"""
208+
""")
209+
210+
stale_nodes_statement = textwrap.dedent(self._decorate_staleness(stale_nodes_statement))
176211

177212
total_records = self._execute_cypher_query(statement=total_nodes_statement)
178213
stale_records = self._execute_cypher_query(statement=stale_nodes_statement,
179-
param_dict={'published_tag': self.publish_tag})
214+
param_dict={MARKER_VAR_NAME: self.marker})
180215
self._validate_staleness_pct(total_records=total_records,
181216
stale_records=stale_records,
182217
types=self.target_nodes)
183218

184219
def _validate_relation_staleness_pct(self):
185220
# type: () -> None
186-
total_relations_statement = """
221+
total_relations_statement = textwrap.dedent("""
187222
MATCH ()-[r]-()
188223
RETURN type(r) as type, count(*) as count;
189-
"""
224+
""")
190225

191-
stale_relations_statement = """
192-
MATCH ()-[r]-()
193-
WHERE r.published_tag <> $published_tag
194-
OR NOT EXISTS(r.published_tag)
195-
RETURN type(r) as type, count(*) as count
196-
"""
226+
stale_relations_statement = textwrap.dedent("""
227+
MATCH ()-[n]-()
228+
WHERE {}
229+
RETURN type(n) as type, count(*) as count
230+
""")
231+
232+
stale_relations_statement = textwrap.dedent(self._decorate_staleness(stale_relations_statement))
197233

198234
total_records = self._execute_cypher_query(statement=total_relations_statement)
199235
stale_records = self._execute_cypher_query(statement=stale_relations_statement,
200-
param_dict={'published_tag': self.publish_tag})
236+
param_dict={MARKER_VAR_NAME: self.marker})
201237
self._validate_staleness_pct(total_records=total_records,
202238
stale_records=stale_records,
203239
types=self.target_relations)
204240

205-
def _execute_cypher_query(self, statement, param_dict={}):
241+
def _execute_cypher_query(self, statement, param_dict={}, dry_run=False):
206242
# type: (str, Dict[str, Any]) -> Iterable[Dict[str, Any]]
207243
LOGGER.info('Executing Cypher query: {statement} with params {params}: '.format(statement=statement,
208244
params=param_dict))
245+
246+
if dry_run:
247+
LOGGER.info('Skipping for it is a dryrun')
248+
return []
249+
209250
start = time.time()
210251
try:
211252
with self._driver.session() as session:

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from setuptools import setup, find_packages
33

44

5-
__version__ = '2.4.3'
5+
__version__ = '2.5.0'
66

77
requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
88
with open(requirements_path) as requirements_file:

0 commit comments

Comments
 (0)