-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathqueue_pmh_rt_record.py
More file actions
127 lines (100 loc) · 4.79 KB
/
queue_pmh_rt_record.py
File metadata and controls
127 lines (100 loc) · 4.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import argparse
from time import sleep
from time import time
from sqlalchemy import text
from app import db
from app import logger
from pmh_record import PmhRecord
from recordthresher.record import RecordthresherParentRecord
from recordthresher.record_maker import PmhRecordMaker
from util import elapsed
from util import safe_commit
import endpoint # magic
class QueuePmhRTRecord:
def worker_run(self, **kwargs):
single_id = kwargs.get("pmh_id", None)
chunk_size = kwargs.get("chunk", 100)
limit = kwargs.get("limit", None)
if limit is None:
limit = float("inf")
if single_id:
pmh = PmhRecord.query.filter(PmhRecord.id == single_id).scalar()
if record := PmhRecordMaker.make_record(pmh):
db.session.merge(record)
secondary_records = PmhRecordMaker.make_secondary_repository_responses(record)
for secondary_record in secondary_records:
db.session.merge(secondary_record)
db.session.merge(
RecordthresherParentRecord(
record_id=secondary_record.id,
parent_record_id=record.id
)
)
safe_commit(db) or logger.info("COMMIT fail")
else:
num_updated = 0
while num_updated < limit:
start_time = time()
pmh_ids = self.fetch_queue_chunk(chunk_size)
if not pmh_ids:
logger.info('no queued pmh records ready to update. waiting...')
sleep(5)
continue
secondary_records = {}
parent_relationships = {}
for pmh_id in pmh_ids:
if pmh := PmhRecord.query.filter(PmhRecord.id == pmh_id).scalar():
if record := PmhRecordMaker.make_record(pmh):
db.session.merge(record)
record_secondary_records = PmhRecordMaker.make_secondary_repository_responses(record)
for record_secondary_record in record_secondary_records:
secondary_records[record_secondary_record.id] = record_secondary_record
parent_relationships[record_secondary_record.id] = RecordthresherParentRecord(
record_id=record_secondary_record.id,
parent_record_id=record.id
)
for secondary_record in secondary_records.values():
print(secondary_record)
db.session.merge(secondary_record)
for parent_relationship in parent_relationships.values():
db.session.merge(parent_relationship)
db.session.execute(
text('''
delete from recordthresher.pmh_record_queue q
where q.pmh_id = any(:pmh_ids)
''').bindparams(pmh_ids=pmh_ids)
)
commit_start_time = time()
safe_commit(db) or logger.info("commit fail")
logger.info(f'commit took {elapsed(commit_start_time, 2)} seconds')
num_updated += chunk_size
logger.info(f'processed {len(pmh_ids)} PMH records in {elapsed(start_time, 2)} seconds')
def fetch_queue_chunk(self, chunk_size):
logger.info("looking for new jobs")
queue_query = text("""
with queue_chunk as (
select pmh_id
from recordthresher.pmh_record_queue
where started is null
order by rand
limit :chunk
for update skip locked
)
update recordthresher.pmh_record_queue q
set started = now()
from queue_chunk
where q.pmh_id = queue_chunk.pmh_id
returning q.pmh_id;
""").bindparams(chunk=chunk_size)
job_time = time()
pmh_id_list = [row[0] for row in db.engine.execute(queue_query.execution_options(autocommit=True)).all()]
logger.info(f'got {len(pmh_id_list)} ids, took {elapsed(job_time)} seconds')
return pmh_id_list
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--pmh_id', nargs="?", type=str, help="pmh_id you want to update the RT record for")
parser.add_argument('--limit', "-l", nargs="?", type=int, help="how many records to update")
parser.add_argument('--chunk', "-ch", nargs="?", default=100, type=int, help="how many records to update at once")
parsed_args = parser.parse_args()
my_queue = QueuePmhRTRecord()
my_queue.worker_run(**vars(parsed_args))