diff --git a/pytest_parallel/__init__.py b/pytest_parallel/__init__.py index 5702a64..474488b 100644 --- a/pytest_parallel/__init__.py +++ b/pytest_parallel/__init__.py @@ -56,12 +56,20 @@ def run_test(session, item, nextitem): raise session.Interrupted(session.shouldstop) +def process_with_no_threads(config, queue, session, tests_per_worker, errors): + return do_work("Thread-0", queue, session, errors) + + + def process_with_threads(config, queue, session, tests_per_worker, errors): # This function will be called from subprocesses, forked from the main # pytest process. First thing we need to do is to change config's value # so we know we are running as a worker. config.parallel_worker = True + if tests_per_worker < 2: + return process_with_no_threads(config, queue, session, tests_per_worker, errors) + threads = [] for _ in range(tests_per_worker): thread = ThreadWorker(queue, session, errors) @@ -78,29 +86,33 @@ def __init__(self, queue, session, errors): self.errors = errors def run(self): - pickling_support.install() - while True: - try: - index = self.queue.get() - if index == 'stop': - self.queue.task_done() - break - except ConnectionRefusedError: - time.sleep(.1) - continue - item = self.session.items[index] - try: - run_test(self.session, item, None) - except BaseException: - import pickle - import sys - - self.errors.put((self.name, pickle.dumps(sys.exc_info()))) - finally: - try: - self.queue.task_done() - except ConnectionRefusedError: - pass + return do_work(self.name, self.queue, self.session, self.errors) + + +def do_work(name, queue, session, errors): + pickling_support.install() + while True: + try: + index = queue.get() + if index == 'stop': + queue.task_done() + break + except ConnectionRefusedError: + time.sleep(.1) + continue + item = session.items[index] + try: + run_test(session, item, None) + except BaseException: + import pickle + import sys + + errors.put((name, pickle.dumps(sys.exc_info()))) + finally: + try: + queue.task_done() + except ConnectionRefusedError: + pass @pytest.mark.trylast @@ -207,8 +219,25 @@ def __init__(self, config): self.workers = workers + # get the number of tests per worker + tests_per_worker = parse_config(config, 'tests_per_worker') + try: + if tests_per_worker == 'auto': + tests_per_worker = 50 + if tests_per_worker: + tests_per_worker = int(tests_per_worker) + else: + tests_per_worker = 1 + except ValueError: + raise ValueError(('tests_per_worker can only be ' + 'an integer or "auto"')) + self.tests_per_worker = tests_per_worker + @pytest.mark.tryfirst def pytest_sessionstart(self, session): + if self.tests_per_worker < 2: + return # Don't need no stinking thread safety! + # make the session threadsafe _pytest.runner.SetupState = ThreadLocalSetupState @@ -231,26 +260,13 @@ def pytest_runtestloop(self, session): if session.config.option.collectonly: return True - # get the number of tests per worker - tests_per_worker = parse_config(session.config, 'tests_per_worker') - try: - if tests_per_worker == 'auto': - tests_per_worker = 50 - if tests_per_worker: - tests_per_worker = int(tests_per_worker) - evenly_divided = math.ceil(len(session.items)/self.workers) - tests_per_worker = min(tests_per_worker, evenly_divided) - else: - tests_per_worker = 1 - except ValueError: - raise ValueError(('tests_per_worker can only be ' - 'an integer or "auto"')) - if self.workers > 1: worker_noun, process_noun = ('workers', 'processes') else: worker_noun, process_noun = ('worker', 'process') + evenly_divided = math.ceil(len(session.items)/self.workers) + tests_per_worker = min(self.tests_per_worker, evenly_divided) if tests_per_worker > 1: test_noun, thread_noun = ('tests', 'threads') else: @@ -328,7 +344,7 @@ def send_response(self, event_name, **arguments): self.responses_queue.put((event_name, arguments)) def pytest_runtest_logreport(self, report): - # We want workers to report to it's master. + # We want workers to report to its master. # Without this "if", master will try to report to itself. if self._config.parallel_worker: data = self._config.hook.pytest_report_to_serializable(