forked from ourresearch/oadoi
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueue_pub.py
More file actions
175 lines (152 loc) · 7.13 KB
/
queue_pub.py
File metadata and controls
175 lines (152 loc) · 7.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import argparse
import logging
import os
import random
from time import sleep
from time import time
from sqlalchemy import orm
from sqlalchemy import text
from app import db
from app import logger
from endpoint import Endpoint # magic
from pub import Pub
from queue_main import DbQueue
from util import elapsed
from util import normalize_doi
from util import run_sql
class DbQueuePub(DbQueue):
def table_name(self, job_type):
table_name = "pub"
return table_name
def process_name(self, job_type):
if self.parsed_vars:
process_name = self.parsed_vars.get("method")
return process_name
def worker_run(self, **kwargs):
single_obj_id = kwargs.get("id", None)
chunk = kwargs.get("chunk", 100)
limit = kwargs.get("limit", 10)
run_class = Pub
run_method = kwargs.get("method")
if single_obj_id:
limit = 1
queue_table = None
elif run_method == "refresh":
queue_table = "pub_refresh_queue"
if not limit:
limit = 1000
text_query_pattern = """
with refresh_queue as (
select id
from {queue_table}
where started is null
order by
priority desc,
finished nulls first,
started,
rand
limit {chunk}
for update skip locked
)
update {queue_table} queue_rows_to_update
set started = now()
from refresh_queue
where refresh_queue.id = queue_rows_to_update.id
returning refresh_queue.id;"""
text_query = text_query_pattern.format(
chunk=chunk,
queue_table=queue_table
)
else:
queue_table = "pub_queue"
if not limit:
limit = 1000
text_query_pattern = """WITH update_pub_queue AS (
SELECT id
FROM {queue_table}
WHERE started is null
order by finished asc
nulls first
LIMIT {chunk}
FOR UPDATE SKIP LOCKED
)
UPDATE {queue_table} queue_rows_to_update
SET started=now()
FROM update_pub_queue
WHERE update_pub_queue.id = queue_rows_to_update.id
RETURNING update_pub_queue.id;"""
text_query = text_query_pattern.format(
limit=limit,
chunk=chunk,
queue_table=queue_table
)
index = 0
start_time = time()
while True:
new_loop_start_time = time()
if single_obj_id:
single_obj_id = normalize_doi(single_obj_id)
objects = [run_class.query.filter(run_class.id == single_obj_id).first()]
else:
logger.info("looking for new jobs")
job_time = time()
row_list = db.engine.execute(text(text_query).execution_options(autocommit=True)).fetchall()
object_ids = [row[0] for row in row_list]
logger.info("got ids, took {} seconds".format(elapsed(job_time)))
job_time = time()
q = db.session.query(Pub).options(orm.undefer('*')).filter(Pub.id.in_(object_ids))
objects = q.all()
logger.info("got pub objects in {} seconds".format(elapsed(job_time)))
# shuffle them or they sort by doi order
random.shuffle(objects)
# objects = Pub.query.from_statement(text(text_query)).execution_options(autocommit=True).all()
# objects = run_class.query.from_statement(text(text_query)).execution_options(autocommit=True).all()
# id_rows = db.engine.execute(text(text_query)).fetchall()
# ids = [row[0] for row in id_rows]
#
# job_time = time()
# objects = run_class.query.filter(run_class.id.in_(ids)).all()
# logger.info(u"finished get-new-objects query in {} seconds".format(elapsed(job_time)))
if not objects:
# logger.info(u"sleeping for 5 seconds, then going again")
sleep(5)
continue
object_ids = [obj.id for obj in objects]
self.update_fn(run_class, run_method, objects, index=index)
# logger.info(u"finished update_fn")
if queue_table:
object_ids_str = ",".join(["'{}'".format(id.replace("'", "''")) for id in object_ids])
object_ids_str = object_ids_str.replace("%", "%%") #sql escaping
sql_command = "update {queue_table} set finished=now(), started=null where id in ({ids})".format(
queue_table=queue_table, ids=object_ids_str)
# logger.info(u"sql command to update finished is: {}".format(sql_command))
run_sql(db, sql_command)
# logger.info(u"finished run_sql")
# finished is set in update_fn
index += 1
if single_obj_id:
return
else:
self.print_update(new_loop_start_time, chunk, limit, start_time, index)
if __name__ == "__main__":
if os.getenv('OADOI_LOG_SQL'):
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
db.session.configure()
parser = argparse.ArgumentParser(description="Run stuff.")
parser.add_argument('--id', nargs="?", type=str, help="id of the one thing you want to update (case sensitive)")
parser.add_argument('--doi', nargs="?", type=str, help="id of the one thing you want to update (case insensitive)")
parser.add_argument('--method', nargs="?", type=str, default="update", help="method name to run")
parser.add_argument('--reset', default=False, action='store_true', help="do you want to just reset?")
parser.add_argument('--run', default=False, action='store_true', help="to run the queue")
parser.add_argument('--status', default=False, action='store_true', help="to logger.info(the status")
parser.add_argument('--dynos', default=None, type=int, help="scale to this many dynos")
parser.add_argument('--logs', default=False, action='store_true', help="logger.info(out logs")
parser.add_argument('--monitor', default=False, action='store_true', help="monitor till done, then turn off dynos")
parser.add_argument('--kick', default=False, action='store_true', help="put started but unfinished dois back to unstarted so they are retried")
parser.add_argument('--limit', "-l", nargs="?", type=int, help="how many jobs to do")
parser.add_argument('--chunk', "-ch", nargs="?", default=500, type=int, help="how many to take off db at once")
parsed_args = parser.parse_args()
job_type = "normal" #should be an object attribute
my_queue = DbQueuePub()
my_queue.parsed_vars = vars(parsed_args)
my_queue.run_right_thing(parsed_args, job_type)