Skip to content
This repository was archived by the owner on Apr 20, 2020. It is now read-only.

Rework scheduling logic#323

Open
m-novikov wants to merge 16 commits intoilastik:masterfrom
m-novikov:sched
Open

Rework scheduling logic#323
m-novikov wants to merge 16 commits intoilastik:masterfrom
m-novikov:sched

Conversation

@m-novikov
Copy link
Contributor

No description provided.

Copy link
Contributor

@k-dominik k-dominik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to see fewer explicit locks! Really nice to rely on qeues for that.

def _distribute_work(self):
while True:
try:
task = self.unassigned_tasks.get()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we stop the scheduler as well?

if task is STOP:
    return

....

if worker is STOP:
    return

....

def stop(...):
    ...
    self.unassigned_tasks.put_nowait(STOP)
    self.ready_workers.put_nowait(STOP)

Copy link
Contributor Author

@m-novikov m-novikov Aug 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a daemon thread and it doesn't do any actual work, so it will gracefully shut down by itself with app termination. Otherwise, we would need to add timeouts to both queues gets and introduce polling to properly terminate scheduler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't putting STOP in both queues solve the problem?

Keeping a running thread after ThreadPool.stop() doesn't seem like a good behavior (it consumes resources and prevents GC for some objects), but this is probably not a big deal since we have 1 pool for the entire lifetime of a process anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And then we need to make STOP object comparable with objects in unassigned_tasks.

Copy link
Member

@emilmelnikov emilmelnikov Aug 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be doable:

@functools.total_ordering
class Stop:
    __lt__ = lambda _self, _other: True
    __eq__ = lambda self, other: self.__class__ == other.__class__


STOP = Stop()

And we also need to fix Request, which should be done anyway because right now it's __lt__ method is broken (works only with other request objects), and there is no total ordering:

@functools.total_ordering
class Request:
    ...

    def __lt__(self, other):
        if not hasattr(other, "_priority"):
            return NotImplemented
        return self._priority < other._priority

    def __eq__(self, other):
        if not hasattr(other, "_priority"):
            return NotImplemented
        return self._priority == other._priority            

This is probably even better because STOP will appear immediately as the next element in all queues, which is what we want.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See updated revision

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Explicitly passing the priority is an even better approach.

# The ThreadPool._Worker loop has a local reference (next_task),
# so wait just a tic for the ThreadPool worker to cycle back to the top of its loop (and discard the reference)
time.sleep(0.1)
gc.collect()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this in a test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It broke when I didn't free the reference :) More robust test would probably try to overload thread pool with work and than check if there was any memory leakage.

__slots__ = ("obj", "exc", "priority")

def __init__(self, *, obj=None, exc=None, priority=None):
def __init__(self, *, obj: Any = None, exc: Exception = None, priority: Optional[List[int]] = None) -> None:
Copy link
Member

@emilmelnikov emilmelnikov Aug 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use a dataclass for this struct? Seems appropriate.

Also, could use TypeVar for obj in __init__ and unwrap:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't really want to import dataclasses here, little benefit as I need my constructor for validation.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants