Skip to content

Commit ca6da08

Browse files
authored
Fix meta file processing in storage and improve schedule job retrieval (#2193)
* Fix meta file processing in storage and improve schedule job retrieval * changed update_unfinished_jobs to use one get_jobs_by_status
1 parent 5e28e31 commit ca6da08

File tree

7 files changed

+159
-58
lines changed

7 files changed

+159
-58
lines changed

nvflare/apis/impl/job_def_manager.py

Lines changed: 64 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import time
2121
import uuid
2222
from abc import ABC, abstractmethod
23-
from typing import Any, Dict, List, Optional
23+
from typing import Any, Dict, List, Optional, Union
2424

2525
from nvflare.apis.fl_context import FLContext
2626
from nvflare.apis.job_def import Job, JobDataKey, JobMetaKey, job_from_meta
@@ -30,47 +30,77 @@
3030
from nvflare.fuel.utils import fobs
3131
from nvflare.fuel.utils.zip_utils import unzip_all_from_bytes, zip_directory_to_bytes
3232

33+
_OBJ_TAG_SCHEDULED = "scheduled"
34+
35+
36+
class JobInfo:
37+
def __init__(self, meta: dict, job_id: str, uri: str):
38+
self.meta = meta
39+
self.job_id = job_id
40+
self.uri = uri
41+
3342

3443
class _JobFilter(ABC):
3544
@abstractmethod
36-
def filter_job(self, meta: dict) -> bool:
45+
def filter_job(self, info: JobInfo) -> bool:
3746
pass
3847

3948

4049
class _StatusFilter(_JobFilter):
4150
def __init__(self, status_to_check):
4251
self.result = []
52+
if not isinstance(status_to_check, list):
53+
# turning to list
54+
status_to_check = [status_to_check]
4355
self.status_to_check = status_to_check
4456

45-
def filter_job(self, meta: dict):
46-
if meta[JobMetaKey.STATUS] == self.status_to_check:
47-
self.result.append(job_from_meta(meta))
57+
def filter_job(self, info: JobInfo):
58+
status = info.meta.get(JobMetaKey.STATUS.value)
59+
if status in self.status_to_check:
60+
self.result.append(job_from_meta(info.meta))
4861
return True
4962

5063

5164
class _AllJobsFilter(_JobFilter):
5265
def __init__(self):
5366
self.result = []
5467

55-
def filter_job(self, meta: dict):
56-
self.result.append(job_from_meta(meta))
68+
def filter_job(self, info: JobInfo):
69+
self.result.append(job_from_meta(info.meta))
5770
return True
5871

5972

6073
class _ReviewerFilter(_JobFilter):
61-
def __init__(self, reviewer_name, fl_ctx: FLContext):
74+
def __init__(self, reviewer_name):
6275
"""Not used yet, for use in future implementations."""
6376
self.result = []
6477
self.reviewer_name = reviewer_name
6578

66-
def filter_job(self, meta: dict):
67-
approvals = meta.get(JobMetaKey.APPROVALS)
79+
def filter_job(self, info: JobInfo):
80+
approvals = info.meta.get(JobMetaKey.APPROVALS.value)
6881
if not approvals or self.reviewer_name not in approvals:
69-
self.result.append(job_from_meta(meta))
82+
self.result.append(job_from_meta(info.meta))
7083
return True
7184

7285

73-
# TODO:: use try block around storage calls
86+
class _ScheduleJobFilter(_JobFilter):
87+
88+
"""
89+
This filter is optimized for selecting jobs to schedule since it is used so frequently (every 1 sec).
90+
"""
91+
92+
def __init__(self, store):
93+
self.store = store
94+
self.result = []
95+
96+
def filter_job(self, info: JobInfo):
97+
status = info.meta.get(JobMetaKey.STATUS.value)
98+
if status == RunStatus.SUBMITTED.value:
99+
self.result.append(job_from_meta(info.meta))
100+
elif status:
101+
# skip this job in all future calls (so the meta file of this job won't be read)
102+
self.store.tag_object(uri=info.uri, tag=_OBJ_TAG_SCHEDULED)
103+
return True
74104

75105

76106
class SimpleJobDefManager(JobDefManagerSpec):
@@ -239,28 +269,40 @@ def get_all_jobs(self, fl_ctx: FLContext) -> List[Job]:
239269
self._scan(job_filter, fl_ctx)
240270
return job_filter.result
241271

242-
def _scan(self, job_filter: _JobFilter, fl_ctx: FLContext):
272+
def get_jobs_to_schedule(self, fl_ctx: FLContext) -> List[Job]:
273+
job_filter = _ScheduleJobFilter(self._get_job_store(fl_ctx))
274+
self._scan(job_filter, fl_ctx, skip_tag=_OBJ_TAG_SCHEDULED)
275+
return job_filter.result
276+
277+
def _scan(self, job_filter: _JobFilter, fl_ctx: FLContext, skip_tag=None):
243278
store = self._get_job_store(fl_ctx)
244-
jid_paths = store.list_objects(self.uri_root)
245-
if not jid_paths:
279+
obj_uris = store.list_objects(self.uri_root, without_tag=skip_tag)
280+
self.log_debug(fl_ctx, f"objects to scan: {len(obj_uris)}")
281+
if not obj_uris:
246282
return
247283

248-
for jid_path in jid_paths:
249-
jid = pathlib.PurePath(jid_path).name
250-
251-
meta = store.get_meta(self.job_uri(jid))
284+
for uri in obj_uris:
285+
jid = pathlib.PurePath(uri).name
286+
job_uri = self.job_uri(jid)
287+
meta = store.get_meta(job_uri)
252288
if meta:
253-
ok = job_filter.filter_job(meta)
289+
ok = job_filter.filter_job(JobInfo(meta, jid, job_uri))
254290
if not ok:
255291
break
256292

257-
def get_jobs_by_status(self, status, fl_ctx: FLContext) -> List[Job]:
293+
def get_jobs_by_status(self, status: Union[RunStatus, List[RunStatus]], fl_ctx: FLContext) -> List[Job]:
294+
"""Get jobs that are in the specified status
295+
Args:
296+
status: a single status value or a list of status values
297+
fl_ctx: the FL context
298+
Returns: list of jobs that are in specified status
299+
"""
258300
job_filter = _StatusFilter(status)
259301
self._scan(job_filter, fl_ctx)
260302
return job_filter.result
261303

262304
def get_jobs_waiting_for_review(self, reviewer_name: str, fl_ctx: FLContext) -> List[Job]:
263-
job_filter = _ReviewerFilter(reviewer_name, fl_ctx)
305+
job_filter = _ReviewerFilter(reviewer_name)
264306
self._scan(job_filter, fl_ctx)
265307
return job_filter.result
266308

nvflare/apis/job_def_manager_spec.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
from abc import ABC, abstractmethod
16-
from typing import Any, Dict, List, Optional
16+
from typing import Any, Dict, List, Optional, Union
1717

1818
from nvflare.apis.fl_component import FLComponent
1919
from nvflare.apis.fl_context import FLContext
@@ -103,7 +103,8 @@ def get_job_data(self, jid: str, fl_ctx: FLContext) -> dict:
103103
fl_ctx (FLContext): FLContext information
104104
105105
Returns:
106-
a dict to hold the job data and workspace. With the format: {JobDataKey.JOB_DATA.value: stored_data, JobDataKey.WORKSPACE_DATA: workspace_data}
106+
a dict to hold the job data and workspace. With the format:
107+
{JobDataKey.JOB_DATA.value: stored_data, JobDataKey.WORKSPACE_DATA: workspace_data}
107108
108109
"""
109110
pass
@@ -145,6 +146,18 @@ def set_status(self, jid: str, status: RunStatus, fl_ctx: FLContext):
145146
"""
146147
pass
147148

149+
@abstractmethod
150+
def get_jobs_to_schedule(self, fl_ctx: FLContext) -> List[Job]:
151+
"""Get job candidates for scheduling.
152+
153+
Args:
154+
fl_ctx: FL context
155+
156+
Returns: list of jobs for scheduling
157+
158+
"""
159+
pass
160+
148161
@abstractmethod
149162
def get_all_jobs(self, fl_ctx: FLContext) -> List[Job]:
150163
"""Gets all Jobs in the system.
@@ -158,11 +171,11 @@ def get_all_jobs(self, fl_ctx: FLContext) -> List[Job]:
158171
pass
159172

160173
@abstractmethod
161-
def get_jobs_by_status(self, run_status: RunStatus, fl_ctx: FLContext) -> List[Job]:
174+
def get_jobs_by_status(self, run_status: Union[RunStatus, List[RunStatus]], fl_ctx: FLContext) -> List[Job]:
162175
"""Gets Jobs of a specified status.
163176
164177
Args:
165-
run_status (RunStatus): status to filter for
178+
run_status: status to filter for: a single or a list of status values
166179
fl_ctx (FLContext): FLContext information
167180
168181
Returns:

nvflare/apis/storage.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,12 @@ def update_data(self, uri: str, data: bytes):
9191
pass
9292

9393
@abstractmethod
94-
def list_objects(self, path: str) -> List[str]:
94+
def list_objects(self, path: str, without_tag=None) -> List[str]:
9595
"""Lists all objects in the specified path.
9696
9797
Args:
9898
path: the path to the objects
99+
without_tag: skip the objects with this specified tag
99100
100101
Returns:
101102
list of URIs of objects
@@ -163,3 +164,14 @@ def delete_object(self, uri: str):
163164
164165
"""
165166
pass
167+
168+
@abstractmethod
169+
def tag_object(self, uri: str, tag: str, data=None):
170+
"""Tag an object with specified tag and data.
171+
Args:
172+
uri: URI of the object
173+
tag: tag to be placed on the object
174+
data: data associated with the tag.
175+
Returns: None
176+
"""
177+
pass

0 commit comments

Comments
 (0)