Skip to content

Support ThreadPoolExecutor to serve simultaneous sync requests #3416

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions tornado/test/web_test.py
Original file line number Diff line number Diff line change
@@ -251,21 +251,19 @@ def get(self):
def get_app_kwargs(self):
return dict(template_path="FinalReturnTest")

def test_finish_method_return_future(self):
def test_finish_method_return_none(self):
response = self.fetch(self.get_url("/finish"))
self.assertEqual(response.code, 200)
self.assertIsInstance(self.final_return, Future)
self.assertTrue(self.final_return.done())
self.assertTrue(self.final_return is None)

response = self.fetch(self.get_url("/finish"), method="POST", body=b"")
self.assertEqual(response.code, 200)
self.assertIsInstance(self.final_return, Future)
self.assertTrue(self.final_return.done())
self.assertTrue(self.final_return is None)

def test_render_method_return_future(self):
def test_render_method_return_none(self):
response = self.fetch(self.get_url("/render"))
self.assertEqual(response.code, 200)
self.assertIsInstance(self.final_return, Future)
self.assertTrue(self.final_return is None)


class CookieTest(WebTestCase):
@@ -2531,17 +2529,23 @@ def get_handlers(self):
class TooHigh(RequestHandler):
def get(self):
self.set_header("Content-Length", "42")
self.finish("ok")

def _real_finish(self) -> "Future[None]":
try:
self.finish("ok")
return super()._real_finish()
except Exception as e:
test.server_error = e
raise

class TooLow(RequestHandler):
def get(self):
self.set_header("Content-Length", "2")
self.finish("hello")

def _real_finish(self) -> "Future[None]":
try:
self.finish("hello")
return super()._real_finish()
except Exception as e:
test.server_error = e
raise
@@ -2555,8 +2559,7 @@ def test_content_length_too_high(self):
with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"):
with ExpectLog(
gen_log,
"(Cannot send error response after headers written"
"|Failed to flush partial response)",
"Failed to flush response",
):
with self.assertRaises(HTTPClientError):
self.fetch("/high", raise_error=True)
@@ -2571,8 +2574,7 @@ def test_content_length_too_low(self):
with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"):
with ExpectLog(
gen_log,
"(Cannot send error response after headers written"
"|Failed to flush partial response)",
"Failed to flush response",
):
with self.assertRaises(HTTPClientError):
self.fetch("/low", raise_error=True)
112 changes: 70 additions & 42 deletions tornado/web.py
Original file line number Diff line number Diff line change
@@ -60,6 +60,7 @@ async def main():

"""

from asyncio import iscoroutine
import base64
import binascii
import datetime
@@ -70,6 +71,7 @@ async def main():
import hmac
import http.cookies
from inspect import isclass
from inspect import iscoroutinefunction
from io import BytesIO
import mimetypes
import numbers
@@ -96,6 +98,7 @@ async def main():
from tornado.log import access_log, app_log, gen_log
from tornado import template
from tornado.escape import utf8, _unicode
from tornado.ioloop import IOLoop
from tornado.routing import (
AnyMatches,
DefaultHostMatches,
@@ -216,8 +219,10 @@ def __init__(
self.application = application
self.request = request
self._headers_written = False
# When this flag is True, avoid further request writting, but finish() will be called later
self._finished = False
self._auto_finish = True
self._skip_finish_fn = False
self._finish_called = False
self._prepared_future = None
self.ui = ObjectDict(
(n, self._ui_method(m)) for n, m in application.ui_methods.items()
@@ -892,7 +897,7 @@ def redirect(
assert isinstance(status, int) and 300 <= status <= 399
self.set_status(status)
self.set_header("Location", utf8(url))
self.finish()
self._finished = True

def write(self, chunk: Union[str, bytes, dict]) -> None:
"""Writes the given chunk to the output buffer.
@@ -999,7 +1004,8 @@ def render(self, template_name: str, **kwargs: Any) -> "Future[None]":
if html_bodies:
hloc = html.index(b"</body>")
html = html[:hloc] + b"".join(html_bodies) + b"\n" + html[hloc:]
return self.finish(html)
self.write(html)
self._finished = True

def render_linked_js(self, js_files: Iterable[str]) -> str:
"""Default method used to render the final js links for the
@@ -1186,7 +1192,12 @@ def flush(self, include_footers: bool = False) -> "Future[None]":
future.set_result(None)
return future

def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> "Future[None]":
def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> None:
if chunk is not None:
self.write(chunk)
self._finished = True

def _real_finish(self) -> "Future[None]":
"""Finishes this response, ending the HTTP request.

Passing a ``chunk`` to ``finish()`` is equivalent to passing that
@@ -1201,12 +1212,11 @@ def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> "Future[Non

Now returns a `.Future` instead of ``None``.
"""
if self._finished:
if self._skip_finish_fn:
return
if self._finish_called:
raise RuntimeError("finish() called twice")

if chunk is not None:
self.write(chunk)

# Automatically support ETags and add the Content-Length header if
# we have not flushed any content yet.
if not self._headers_written:
@@ -1238,6 +1248,7 @@ def finish(self, chunk: Optional[Union[str, bytes, dict]] = None) -> "Future[Non
future = self.flush(include_footers=True)
self.request.connection.finish()
self._log()
self._finish_called = True
self._finished = True
self.on_finish()
self._break_cycles()
@@ -1255,6 +1266,7 @@ def detach(self) -> iostream.IOStream:
.. versionadded:: 5.1
"""
self._finished = True
self._skip_finish_fn = True
# TODO: add detach to HTTPConnection?
return self.request.connection.detach() # type: ignore

@@ -1276,15 +1288,7 @@ def send_error(self, status_code: int = 500, **kwargs: Any) -> None:
"""
if self._headers_written:
gen_log.error("Cannot send error response after headers written")
if not self._finished:
# If we get an error between writing headers and finishing,
# we are unlikely to be able to finish due to a
# Content-Length mismatch. Try anyway to release the
# socket.
try:
self.finish()
except Exception:
gen_log.error("Failed to flush partial response", exc_info=True)
self._finished = True
return
self.clear()

@@ -1298,8 +1302,7 @@ def send_error(self, status_code: int = 500, **kwargs: Any) -> None:
self.write_error(status_code, **kwargs)
except Exception:
app_log.error("Uncaught exception in write_error", exc_info=True)
if not self._finished:
self.finish()
self._finished = True

def write_error(self, status_code: int, **kwargs: Any) -> None:
"""Override to implement custom error pages.
@@ -1318,13 +1321,13 @@ def write_error(self, status_code: int, **kwargs: Any) -> None:
self.set_header("Content-Type", "text/plain")
for line in traceback.format_exception(*kwargs["exc_info"]):
self.write(line)
self.finish()
else:
self.finish(
self.write(
"<html><title>%(code)d: %(message)s</title>"
"<body>%(code)d: %(message)s</body></html>"
% {"code": status_code, "message": self._reason}
)
self._finished = True

@property
def locale(self) -> tornado.locale.Locale:
@@ -1743,12 +1746,8 @@ def val(x: bytes) -> bytes:
break
return match

async def _execute(
self, transforms: List["OutputTransform"], *args: bytes, **kwargs: bytes
) -> None:
"""Executes this request with the given output transforms."""
self._transforms = transforms
try:
async def _execute_no_err(self, *args: bytes, **kwargs: bytes):
if True:
if self.request.method not in self.SUPPORTED_METHODS:
raise HTTPError(405)
self.path_args = [self.decode_argument(arg) for arg in args]
@@ -1782,27 +1781,55 @@ async def _execute(
try:
await self.request._body_future
except iostream.StreamClosedError:
return
raise FinishExecute()

tornado_workers_executor = self.application.settings.get('tornado_workers_executor')
method = getattr(self, self.request.method.lower())
result = method(*self.path_args, **self.path_kwargs)
if iscoroutinefunction(method) or getattr(method, '__tornado_coroutine__', False):
result = await method(*self.path_args, **self.path_kwargs)
elif tornado_workers_executor:
result = await IOLoop.current().run_in_executor(
tornado_workers_executor,
functools.partial(method, *self.path_args, **self.path_kwargs))
else:
result = method(*self.path_args, **self.path_kwargs)
if result is not None:
result = await result
if self._auto_finish and not self._finished:
self.finish()
if iscoroutine(result):
app_log.warn(f'{method} returned a coroutine, you should await your own coroutines')
await result
else:
app_log.warn(f'{method} returned {result}, it was ignored')

async def _execute(
self, transforms: List["OutputTransform"], *args: bytes, **kwargs: bytes
) -> None:
"""Executes this request with the given output transforms."""
self._transforms = transforms
try:
await self._execute_no_err(*args, **kwargs)
except FinishExecute:
return
except Finish as e:
if e.args:
self.write(*e.args)
self._finished = True
except Exception as e:
try:
self._handle_request_exception(e)
except Exception:
app_log.error("Exception in exception handler", exc_info=True)
finally:
# Unset result to avoid circular references
result = None
if self._prepared_future is not None and not self._prepared_future.done():
# In case we failed before setting _prepared_future, do it
# now (to unblock the HTTP server). Note that this is not
# in a finally block to avoid GC issues prior to Python 3.4.
self._prepared_future.set_result(None)
finally:
if not self._finish_called:
try:
self._real_finish()
except Exception:
self.log_exception(*sys.exc_info())
gen_log.error("Failed to flush response", exc_info=True)

def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
"""Implement this method to handle streamed request data.
@@ -1830,11 +1857,6 @@ def _request_summary(self) -> str:
)

def _handle_request_exception(self, e: BaseException) -> None:
if isinstance(e, Finish):
# Not an error; just finish the request without logging.
if not self._finished:
self.finish(*e.args)
return
try:
self.log_exception(*sys.exc_info())
except Exception:
@@ -2518,6 +2540,11 @@ class Finish(Exception):
pass


class FinishExecute(Exception):
"""A convenience exception to just finish _execute() without calling finish()"""
pass


class MissingArgumentError(HTTPError):
"""Exception raised by `RequestHandler.get_argument`.

@@ -2677,8 +2704,8 @@ def reset(cls) -> None:
with cls._lock:
cls._static_hashes = {}

def head(self, path: str) -> Awaitable[None]:
return self.get(path, include_body=False)
async def head(self, path: str) -> Awaitable[None]:
return await self.get(path, include_body=False)

async def get(self, path: str, include_body: bool = True) -> None:
# Set up our path instance variables.
@@ -3148,6 +3175,7 @@ def initialize(
def prepare(self) -> None:
self.fallback(self.request)
self._finished = True
self._skip_finish_fn = True
self.on_finish()


1 change: 1 addition & 0 deletions tornado/websocket.py
Original file line number Diff line number Diff line change
@@ -930,6 +930,7 @@ async def _accept_connection(self, handler: WebSocketHandler) -> None:
handler.set_header("Connection", "Upgrade")
handler.set_header("Sec-WebSocket-Accept", self._challenge_response(handler))
handler.finish()
handler._real_finish()

self.stream = handler._detach_stream()