Skip to content
This repository was archived by the owner on May 29, 2024. It is now read-only.

Allow non-threadsafe projects to be compatible with --tests-per-worker=1 #79

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 55 additions & 39 deletions pytest_parallel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,20 @@ def run_test(session, item, nextitem):
raise session.Interrupted(session.shouldstop)


def process_with_no_threads(config, queue, session, tests_per_worker, errors):
return do_work("Thread-0", queue, session, errors)



def process_with_threads(config, queue, session, tests_per_worker, errors):
# This function will be called from subprocesses, forked from the main
# pytest process. First thing we need to do is to change config's value
# so we know we are running as a worker.
config.parallel_worker = True

if tests_per_worker < 2:
return process_with_no_threads(config, queue, session, tests_per_worker, errors)

threads = []
for _ in range(tests_per_worker):
thread = ThreadWorker(queue, session, errors)
Expand All @@ -78,29 +86,33 @@ def __init__(self, queue, session, errors):
self.errors = errors

def run(self):
pickling_support.install()
while True:
try:
index = self.queue.get()
if index == 'stop':
self.queue.task_done()
break
except ConnectionRefusedError:
time.sleep(.1)
continue
item = self.session.items[index]
try:
run_test(self.session, item, None)
except BaseException:
import pickle
import sys

self.errors.put((self.name, pickle.dumps(sys.exc_info())))
finally:
try:
self.queue.task_done()
except ConnectionRefusedError:
pass
return do_work(self.name, self.queue, self.session, self.errors)


def do_work(name, queue, session, errors):
pickling_support.install()
while True:
try:
index = queue.get()
if index == 'stop':
queue.task_done()
break
except ConnectionRefusedError:
time.sleep(.1)
continue
item = session.items[index]
try:
run_test(session, item, None)
except BaseException:
import pickle
import sys

errors.put((name, pickle.dumps(sys.exc_info())))
finally:
try:
queue.task_done()
except ConnectionRefusedError:
pass


@pytest.mark.trylast
Expand Down Expand Up @@ -207,8 +219,25 @@ def __init__(self, config):

self.workers = workers

# get the number of tests per worker
tests_per_worker = parse_config(config, 'tests_per_worker')
try:
if tests_per_worker == 'auto':
tests_per_worker = 50
if tests_per_worker:
tests_per_worker = int(tests_per_worker)
else:
tests_per_worker = 1
except ValueError:
raise ValueError(('tests_per_worker can only be '
'an integer or "auto"'))
self.tests_per_worker = tests_per_worker

@pytest.mark.tryfirst
def pytest_sessionstart(self, session):
if self.tests_per_worker < 2:
return # Don't need no stinking thread safety!

# make the session threadsafe
_pytest.runner.SetupState = ThreadLocalSetupState

Expand All @@ -231,26 +260,13 @@ def pytest_runtestloop(self, session):
if session.config.option.collectonly:
return True

# get the number of tests per worker
tests_per_worker = parse_config(session.config, 'tests_per_worker')
try:
if tests_per_worker == 'auto':
tests_per_worker = 50
if tests_per_worker:
tests_per_worker = int(tests_per_worker)
evenly_divided = math.ceil(len(session.items)/self.workers)
tests_per_worker = min(tests_per_worker, evenly_divided)
else:
tests_per_worker = 1
except ValueError:
raise ValueError(('tests_per_worker can only be '
'an integer or "auto"'))

if self.workers > 1:
worker_noun, process_noun = ('workers', 'processes')
else:
worker_noun, process_noun = ('worker', 'process')

evenly_divided = math.ceil(len(session.items)/self.workers)
tests_per_worker = min(self.tests_per_worker, evenly_divided)
if tests_per_worker > 1:
test_noun, thread_noun = ('tests', 'threads')
else:
Expand Down Expand Up @@ -328,7 +344,7 @@ def send_response(self, event_name, **arguments):
self.responses_queue.put((event_name, arguments))

def pytest_runtest_logreport(self, report):
# We want workers to report to it's master.
# We want workers to report to its master.
# Without this "if", master will try to report to itself.
if self._config.parallel_worker:
data = self._config.hook.pytest_report_to_serializable(
Expand Down