-
Notifications
You must be signed in to change notification settings - Fork 35
Expand file tree
/
Copy pathqueue_doi_rt_record.py
More file actions
151 lines (120 loc) · 5.35 KB
/
queue_doi_rt_record.py
File metadata and controls
151 lines (120 loc) · 5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import argparse
import datetime
from threading import Thread
from time import sleep
from time import time
from sqlalchemy import text
from app import db
from app import logger
from pub import Pub
from recordthresher.record import RecordthresherParentRecord
from recordthresher.record_maker import CrossrefRecordMaker, PmhRecordMaker
from recordthresher.record_maker.parseland_record_maker import ParselandRecordMaker
from util import elapsed
from util import safe_commit
import endpoint # magic
PROCESSED = 0
def print_stats():
start = datetime.datetime.now()
while True:
now = datetime.datetime.now()
hrs_running = (now - start).total_seconds() / (60 * 60)
rate_hr = round(PROCESSED/hrs_running, 2)
logger.info(f'[*] Processing rate: {rate_hr}/hr')
sleep(5)
class QueueDoiRtRecord:
def worker_run(self, **kwargs):
global PROCESSED
single_id = kwargs.get("doi", None)
chunk_size = kwargs.get("chunk", 100)
limit = kwargs.get("limit", None)
if limit is None:
limit = float("inf")
if single_id:
pub = Pub.query.get(single_id)
if record := CrossrefRecordMaker.make_record(pub):
db.session.merge(record)
PROCESSED += 1
safe_commit(db) or logger.info("COMMIT fail")
else:
num_updated = 0
while num_updated < limit:
start_time = time()
dois = self.fetch_queue_chunk(chunk_size)
if not dois:
logger.info(
'no queued DOI records ready to update. waiting...')
sleep(5)
continue
seen_record_ids = set()
for doi in dois:
logger.info(f'making RecordThresher record for DOI {doi}')
if pub := Pub.query.get(doi):
if record := CrossrefRecordMaker.make_record(pub):
if record.id not in seen_record_ids:
db.session.merge(record)
seen_record_ids.add(record.id)
if pl_record := ParselandRecordMaker.make_record(pub):
if pl_record.id not in seen_record_ids:
db.session.merge(pl_record)
seen_record_ids.add(pl_record.id)
secondary_records = PmhRecordMaker.make_secondary_repository_responses(record)
for secondary_record in secondary_records:
if secondary_record.id not in seen_record_ids:
db.session.merge(secondary_record)
seen_record_ids.add(secondary_record.id)
db.session.merge(
RecordthresherParentRecord(
record_id=secondary_record.id,
parent_record_id=record.id
)
)
PROCESSED += 1
db.session.execute(
text('''
delete from recordthresher.doi_record_queue q
where q.doi = any(:dois)
''').bindparams(dois=dois)
)
commit_start_time = time()
safe_commit(db) or logger.info("commit fail")
logger.info(
f'commit took {elapsed(commit_start_time, 2)} seconds')
num_updated += chunk_size
logger.info(
f'processed {len(dois)} DOI records in {elapsed(start_time, 2)} seconds')
def fetch_queue_chunk(self, chunk_size):
logger.info("looking for new jobs")
queue_query = text("""
with queue_chunk as (
select doi
from recordthresher.doi_record_queue
where started is null
order by updated desc nulls last, rand
limit :chunk
for update skip locked
)
update recordthresher.doi_record_queue q
set started = now()
from queue_chunk
where q.doi = queue_chunk.doi
returning q.doi;
""").bindparams(chunk=chunk_size)
job_time = time()
doi_list = [row[0] for row in db.engine.execute(
queue_query.execution_options(autocommit=True)).all()]
logger.info(
f'got {len(doi_list)} DOIs, took {elapsed(job_time)} seconds')
return doi_list
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--doi', nargs="?", type=str,
help="doi you want to update the RT record for")
parser.add_argument('--limit', "-l", nargs="?", type=int,
help="how many records to update")
parser.add_argument('--chunk', "-ch", nargs="?", default=100, type=int,
help="how many records to update at once")
parsed_args = parser.parse_args()
my_queue = QueueDoiRtRecord()
Thread(target=print_stats, daemon=True).start()
my_queue.worker_run(**vars(parsed_args))