From 2e103e1ba652bc2fcd455c93b05056dde956dcb4 Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Fri, 11 Sep 2015 17:47:26 -0500 Subject: [PATCH 1/4] Larger pending queue depth. --- docs/sphinx/configuration.rst | 5 ++ src/gofer/agent/config.py | 8 +++ src/gofer/agent/plugin.py | 2 +- src/gofer/agent/rmi.py | 6 +- src/gofer/rmi/store.py | 92 ++++++++++++++++++++----- test/functional/plugins/testplugin.conf | 8 +++ 6 files changed, 101 insertions(+), 20 deletions(-) diff --git a/docs/sphinx/configuration.rst b/docs/sphinx/configuration.rst index 78599450..9836d07b 100644 --- a/docs/sphinx/configuration.rst +++ b/docs/sphinx/configuration.rst @@ -115,6 +115,11 @@ Defines basic plugin properties. File extensions just be (.conf|.json). +[pending] + +- **depth** - The pending queue depth. Default: 100K + + [model] ------- diff --git a/src/gofer/agent/config.py b/src/gofer/agent/config.py index b964dc36..40efc76d 100644 --- a/src/gofer/agent/config.py +++ b/src/gofer/agent/config.py @@ -107,6 +107,11 @@ ('authenticator', OPTIONAL, ANY), ) ), + ('pending', OPTIONAL, + ( + ('depth', REQUIRED, NUMBER), + ) + ), ('model', OPTIONAL, ( ('managed', OPTIONAL, '(0|1|2)'), @@ -134,6 +139,9 @@ 'accept': ',', 'forward': ',' }, + 'pending': { + 'depth': '100000' + }, 'model': { 'managed': '2' } diff --git a/src/gofer/agent/plugin.py b/src/gofer/agent/plugin.py index 33707302..aeac3071 100644 --- a/src/gofer/agent/plugin.py +++ b/src/gofer/agent/plugin.py @@ -203,7 +203,7 @@ def __init__(self, descriptor, path): self.actions = [] self.dispatcher = Dispatcher() self.whiteboard = Whiteboard() - self.scheduler = Scheduler(self) + self.scheduler = Scheduler(self, int(descriptor.pending.depth)) self.delegate = Delegate() self.authenticator = None self.consumer = None diff --git a/src/gofer/agent/rmi.py b/src/gofer/agent/rmi.py index a4a01729..d7e823ab 100644 --- a/src/gofer/agent/rmi.py +++ b/src/gofer/agent/rmi.py @@ -146,14 +146,16 @@ class Scheduler(Thread): Processes the *pending* queue. """ - def __init__(self, plugin): + def __init__(self, plugin, depth): """ :param plugin: A plugin. :type plugin: gofer.agent.plugin.Plugin + :param depth: The max queue depth. + :type depth: int """ Thread.__init__(self, name='scheduler:%s' % plugin.stream) self.plugin = plugin - self.pending = Pending(plugin.stream) + self.pending = Pending(plugin.stream, depth) self.builtin = Builtin(plugin) self.setDaemon(True) diff --git a/src/gofer/rmi/store.py b/src/gofer/rmi/store.py index de2d3bca..fd246dfd 100644 --- a/src/gofer/rmi/store.py +++ b/src/gofer/rmi/store.py @@ -24,6 +24,7 @@ from Queue import Queue, Empty from gofer import NAME, Thread, Singleton +from gofer.common import utf8 from gofer.common import mkdir, rmdir, unlink from gofer.messaging import Document from gofer.rmi.tracker import Tracker @@ -32,6 +33,22 @@ log = getLogger(__name__) +class FileCorrupted(Exception): + """ + File corrupted and likely discarded. + """ + + def __init__(self, path): + super(FileCorrupted, self).__init__(path) + + @property + def path(self): + return self.args[0] + + def __str__(self): + return "File %s corrupted. Discarded" % self.path + + class Pending(object): """ Persistent store and queuing for pending requests. @@ -41,6 +58,13 @@ class Pending(object): PENDING = '/var/lib/%s/messaging/pending' % NAME + # The queue depth + MAX_DEPTH = 100000 + + # The soft threshold determines when the journal + # file path is queued instead of the actual request + SOFT_THRESHOLD = 50 + @staticmethod def _write(request, path): """ @@ -54,7 +78,7 @@ def _write(request, path): try: body = request.dump() fp.write(body) - log.debug('wrote [%s]: %s', path, body) + log.debug('Writing [%s]: %s', path, body) finally: fp.close() @@ -66,6 +90,7 @@ def _read(path): :type path: str :return: The read request. :rtype: Document + :raise FileCorrupted: """ fp = open(path) try: @@ -73,11 +98,11 @@ def _read(path): request = Document() body = fp.read() request.load(body) - log.debug('read [%s]: %s', path, body) + log.debug('Read [%s]: %s', path, body) return request except ValueError: - log.error('%s corrupt (discarded)', path) os.unlink(path) + raise FileCorrupted(path) finally: fp.close() @@ -91,13 +116,15 @@ def _list(self): paths = [os.path.join(path, name) for name in os.listdir(path)] return sorted(paths) - def __init__(self, stream): + def __init__(self, stream, depth=MAX_DEPTH): """ :param stream: The stream name. :type stream: str + :param depth: The queue depth. + :type depth: int """ self.stream = stream - self.queue = Queue(maxsize=100) + self.queue = Queue(maxsize=depth) self.is_open = False self.sequential = Sequential() self.journal = {} @@ -116,11 +143,11 @@ def _open(self): log.info('Using: %s', path) for path in self._list(): log.info('Restoring: %s', path) - request = Pending._read(path) - if not request: - # read failed - continue - self._put(request, path) + try: + request = Pending._read(path) + self._put(request, path) + except FileCorrupted, fe: + log.debug(utf8(fe)) self.is_open = True def put(self, request): @@ -147,7 +174,7 @@ def get(self): """ while not Thread.aborted(): try: - return self.queue.get(timeout=10) + return self._get(timeout=10) except Empty: pass @@ -161,9 +188,9 @@ def commit(self, sn): try: path = self.journal[sn] unlink(path) - log.debug('%s committed', sn) + log.debug('Request %s committed', sn) except KeyError: - log.warn('%s not found for commit', sn) + log.warn('Request %s not found for commit', sn) def delete(self): """ @@ -175,7 +202,7 @@ def delete(self): self._drain() path = os.path.join(Pending.PENDING, self.stream) rmdir(path) - log.info('%s, deleted', path) + log.info('File %s deleted', path) def _drain(self): """ @@ -184,24 +211,55 @@ def _drain(self): self.is_open = False while not Thread.aborted(): try: - request = self.queue.get(timeout=1) + request = self._get(timeout=1) self.commit(request.sn) except Empty: break + def _get(self, timeout=10): + """ + Get the next pending request to be dispatched. + The queued *thing* can be either a request or the path to + a journal file. When a path is detected, the file is read + and the actual request is read. + :return: The next pending request. + :rtype: Document + :raise Empty: when queue empty. + """ + while True: + if Thread.aborted(): + raise Empty() + thing = self.queue.get(timeout=timeout) + if isinstance(thing, str): + try: + request = Pending._read(thing) + except (IOError, OSError, FileCorrupted), fe: + log.warn(utf8(fe)) + continue + else: + request = thing + request.ts = time() + return request + def _put(self, request, jnl_path): """ Enqueue the request. + When the queue depth threshold is exceeded, the path to the + journal file is queued instead of the actual request. This + is done to reduce memory footprint for long backlogs. :param request: An AMQP request. :type request: Document :param jnl_path: Path to the associated journal file. :type jnl_path: str """ - request.ts = time() tracker = Tracker() tracker.add(request.sn, request.data) self.journal[request.sn] = jnl_path - self.queue.put(request) + if self.queue.qsize() > Pending.SOFT_THRESHOLD: + thing = jnl_path + else: + thing = request + self.queue.put(thing) class Sequential(object): diff --git a/test/functional/plugins/testplugin.conf b/test/functional/plugins/testplugin.conf index c72dce32..166d562d 100644 --- a/test/functional/plugins/testplugin.conf +++ b/test/functional/plugins/testplugin.conf @@ -27,6 +27,11 @@ # host_validation # The (optional) flag indicates SSL host validation should be performed. # +# [pending] +# +# depth +# The pending queue depth. Default: 100K +# # [model] # # managed @@ -47,6 +52,9 @@ enabled=1 forward=* +[pending] +depth=500 + [messaging] uuid=TEST url= From 78d97230bc3a3951fbe93588ca2e210cb1092280 Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Fri, 11 Sep 2015 18:08:58 -0500 Subject: [PATCH 2/4] Fix unit tests. --- test/unit/agent/test_plugin.py | 27 ++++++++++++++++----------- test/unit/agent/test_rmi.py | 15 ++++++++------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/test/unit/agent/test_plugin.py b/test/unit/agent/test_plugin.py index 5330be3d..a3d24fd3 100644 --- a/test/unit/agent/test_plugin.py +++ b/test/unit/agent/test_plugin.py @@ -121,7 +121,8 @@ class TestPlugin(TestCase): @patch('gofer.agent.plugin.ThreadPool') def test_init(self, pool, dispatcher, whiteboard, scheduler, delegate): threads = 4 - descriptor = Mock(main=Mock(threads=threads)) + depth = 1234 + descriptor = Mock(main=Mock(threads=threads), pending=Mock(depth=depth)) path = '/tmp/path' # test @@ -130,7 +131,7 @@ def test_init(self, pool, dispatcher, whiteboard, scheduler, delegate): # validation pool.assert_called_once_with(threads) dispatcher.assert_called_once_with() - scheduler.assert_called_once_with(plugin) + scheduler.assert_called_once_with(plugin, depth) delegate.assert_called_once_with() self.assertEqual(plugin.descriptor, descriptor) self.assertEqual(plugin.path, path) @@ -156,6 +157,8 @@ def test_properties(self, connector, model): threads=4, forward='a, b, c', accept='d, e, f'), + pending=Mock( + depth=1234), messaging=Mock( uuid='x99', url='amqp://localhost') @@ -195,7 +198,7 @@ def test_properties(self, connector, model): @patch('gofer.agent.plugin.Whiteboard', Mock()) @patch('gofer.agent.plugin.ThreadPool', Mock()) def test_start(self, scheduler): - descriptor = Mock(main=Mock(threads=4)) + descriptor = Mock(main=Mock(threads=4), pending=Mock(depth=1234)) scheduler.return_value.isAlive.return_value = False # test @@ -211,7 +214,7 @@ def test_start(self, scheduler): @patch('gofer.agent.plugin.Whiteboard', Mock()) @patch('gofer.agent.plugin.ThreadPool', Mock()) def test_start_already_started(self, scheduler): - descriptor = Mock(main=Mock(threads=4)) + descriptor = Mock(main=Mock(threads=4), pending=Mock(depth=1234)) scheduler.return_value.isAlive.return_value = True # test @@ -227,7 +230,7 @@ def test_start_already_started(self, scheduler): @patch('gofer.agent.plugin.ThreadPool') @patch('gofer.agent.plugin.Whiteboard', Mock()) def test_shutdown(self, pool, scheduler): - descriptor = Mock(main=Mock(threads=4)) + descriptor = Mock(main=Mock(threads=4), pending=Mock(depth=1234)) scheduler.return_value.isAlive.return_value = True # test @@ -245,7 +248,7 @@ def test_shutdown(self, pool, scheduler): @patch('gofer.agent.plugin.ThreadPool') @patch('gofer.agent.plugin.Whiteboard', Mock()) def test_shutdown_not_running(self, pool, scheduler): - descriptor = Mock(main=Mock(threads=4)) + descriptor = Mock(main=Mock(threads=4), pending=Mock(depth=1234)) scheduler.return_value.isAlive.return_value = False # test @@ -269,6 +272,8 @@ def test_refresh(self, connector): main=Mock( enabled='1', threads=4), + pending=Mock( + depth=1234), messaging=Mock( uuid='x99', url='amqp://localhost', @@ -299,7 +304,7 @@ def test_refresh(self, connector): @patch('gofer.agent.plugin.Whiteboard', Mock()) def test_attach(self, pool, model, consumer, node): queue = 'test' - descriptor = Mock(main=Mock(threads=4)) + descriptor = Mock(main=Mock(threads=4), pending=Mock(depth=1234)) pool.return_value.run.side_effect = lambda fn: fn() model.return_value.queue = queue @@ -326,7 +331,7 @@ def test_attach(self, pool, model, consumer, node): @patch('gofer.agent.plugin.Scheduler', Mock()) @patch('gofer.agent.plugin.Whiteboard', Mock()) def test_detach(self, model): - descriptor = Mock(main=Mock(threads=4)) + descriptor = Mock(main=Mock(threads=4), pending=Mock(depth=1234)) consumer = Mock() # test @@ -347,7 +352,7 @@ def test_detach(self, model): @patch('gofer.agent.plugin.Scheduler', Mock()) @patch('gofer.agent.plugin.Whiteboard', Mock()) def test_detach_not_attached(self, model): - descriptor = Mock(main=Mock(threads=4)) + descriptor = Mock(main=Mock(threads=4), pending=Mock(depth=1234)) # test plugin = Plugin(descriptor, '') @@ -361,7 +366,7 @@ def test_detach_not_attached(self, model): @patch('gofer.agent.plugin.Scheduler', Mock()) @patch('gofer.agent.plugin.Whiteboard', Mock()) def test_detach_no_teardown(self, model): - descriptor = Mock(main=Mock(threads=4)) + descriptor = Mock(main=Mock(threads=4), pending=Mock(depth=1234)) consumer = Mock() # test @@ -379,7 +384,7 @@ def test_detach_no_teardown(self, model): @patch('gofer.agent.plugin.Scheduler', Mock()) @patch('gofer.agent.plugin.Whiteboard', Mock()) def test_provides(self): - descriptor = Mock(main=Mock(threads=4)) + descriptor = Mock(main=Mock(threads=4), pending=Mock(depth=1234)) # test plugin = Plugin(descriptor, '') diff --git a/test/unit/agent/test_rmi.py b/test/unit/agent/test_rmi.py index 323c92e0..02f5b52e 100644 --- a/test/unit/agent/test_rmi.py +++ b/test/unit/agent/test_rmi.py @@ -23,9 +23,10 @@ class TestScheduler(TestCase): @patch('gofer.agent.rmi.Pending') @patch('gofer.agent.rmi.Builtin') def test_init(self, builtin, pending, set_daemon): + depth = 1234 plugin = Mock() - scheduler = Scheduler(plugin) - pending.assert_called_once_with(plugin.name) + scheduler = Scheduler(plugin, depth) + pending.assert_called_once_with(plugin.name, depth) builtin.assert_called_once_with(plugin) set_daemon.assert_called_with(True) self.assertEqual(scheduler.plugin, plugin) @@ -55,7 +56,7 @@ def test_run(self, builtin, pending, task, select_plugin, aborted): select_plugin.side_effect = [builtin.return_value, plugin] # test - scheduler = Scheduler(plugin) + scheduler = Scheduler(plugin, 243) scheduler.run() # validation @@ -88,7 +89,7 @@ def test_run_raised(self, aborted, select_plugin, pending): aborted.side_effect = [False, True] # test - scheduler = Scheduler(plugin) + scheduler = Scheduler(plugin, 243) scheduler.run() # validation @@ -100,7 +101,7 @@ def test_run_raised(self, aborted, select_plugin, pending): def test_select_plugin(self, builtin): plugin = Mock() request = Document(request={'classname': 'A'}) - scheduler = Scheduler(plugin) + scheduler = Scheduler(plugin, 243) # find builtin builtin.return_value.provides.return_value = True selected = scheduler.select_plugin(request) @@ -122,7 +123,7 @@ def test_select_plugin(self, builtin): def test_add(self, pending): plugin = Mock() request = Mock() - scheduler = Scheduler(plugin) + scheduler = Scheduler(plugin, 243) scheduler.add(request) pending.return_value.put.assert_called_once_with(request) @@ -132,7 +133,7 @@ def test_add(self, pending): @patch('threading.Thread.setDaemon', Mock()) def test_shutdown(self, abort, builtin): plugin = Mock() - scheduler = Scheduler(plugin) + scheduler = Scheduler(plugin, 243) scheduler.shutdown() builtin.return_value.shutdown.assert_called_once_with() abort.assert_called_once_with() From 78f1c64bb8a2d6e38189112111dfc2e4752d9a45 Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Fri, 11 Sep 2015 18:35:38 -0500 Subject: [PATCH 3/4] plugin descriptors documentation headers updated with [pending] sections. --- etc/gofer/plugins/builtin.conf | 5 +++++ etc/gofer/plugins/package.conf | 5 +++++ etc/gofer/plugins/system.conf | 5 +++++ etc/gofer/plugins/virt.conf | 5 +++++ 4 files changed, 20 insertions(+) diff --git a/etc/gofer/plugins/builtin.conf b/etc/gofer/plugins/builtin.conf index 8b982169..36cc8290 100644 --- a/etc/gofer/plugins/builtin.conf +++ b/etc/gofer/plugins/builtin.conf @@ -14,6 +14,11 @@ # forward # Forward to. A comma (,) separated list of plugin names (,=none|*=all). # +# [pending] +# +# depth +# The pending queue depth. Default: 100K +# # [messaging] # # uuid diff --git a/etc/gofer/plugins/package.conf b/etc/gofer/plugins/package.conf index 8b982169..36cc8290 100644 --- a/etc/gofer/plugins/package.conf +++ b/etc/gofer/plugins/package.conf @@ -14,6 +14,11 @@ # forward # Forward to. A comma (,) separated list of plugin names (,=none|*=all). # +# [pending] +# +# depth +# The pending queue depth. Default: 100K +# # [messaging] # # uuid diff --git a/etc/gofer/plugins/system.conf b/etc/gofer/plugins/system.conf index 8b982169..36cc8290 100644 --- a/etc/gofer/plugins/system.conf +++ b/etc/gofer/plugins/system.conf @@ -14,6 +14,11 @@ # forward # Forward to. A comma (,) separated list of plugin names (,=none|*=all). # +# [pending] +# +# depth +# The pending queue depth. Default: 100K +# # [messaging] # # uuid diff --git a/etc/gofer/plugins/virt.conf b/etc/gofer/plugins/virt.conf index ac6f4ca7..7e5c7419 100644 --- a/etc/gofer/plugins/virt.conf +++ b/etc/gofer/plugins/virt.conf @@ -14,6 +14,11 @@ # forward # Forward to. A comma (,) separated list of plugin names (,=none|*=all). # +# [pending] +# +# depth +# The pending queue depth. Default: 100K +# # [messaging] # # uuid From d9631e07ddc86d5c07d953639543c25c95e76cfc Mon Sep 17 00:00:00 2001 From: Jeff Ortel Date: Mon, 14 Sep 2015 10:47:23 -0500 Subject: [PATCH 4/4] Default changed to 10k. --- docs/sphinx/configuration.rst | 2 +- etc/gofer/plugins/builtin.conf | 2 +- etc/gofer/plugins/package.conf | 2 +- etc/gofer/plugins/system.conf | 2 +- etc/gofer/plugins/virt.conf | 2 +- src/gofer/agent/config.py | 4 ++-- src/gofer/rmi/store.py | 2 +- 7 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/sphinx/configuration.rst b/docs/sphinx/configuration.rst index 16e19bc0..207351a8 100644 --- a/docs/sphinx/configuration.rst +++ b/docs/sphinx/configuration.rst @@ -131,7 +131,7 @@ File extensions just be (.conf|.json). [pending] -- **depth** - The pending queue depth. Default: 100K +- **depth** - The pending queue depth. Default: 10K [model] diff --git a/etc/gofer/plugins/builtin.conf b/etc/gofer/plugins/builtin.conf index 36cc8290..9c3fe47d 100644 --- a/etc/gofer/plugins/builtin.conf +++ b/etc/gofer/plugins/builtin.conf @@ -17,7 +17,7 @@ # [pending] # # depth -# The pending queue depth. Default: 100K +# The pending queue depth. Default: 10K # # [messaging] # diff --git a/etc/gofer/plugins/package.conf b/etc/gofer/plugins/package.conf index 36cc8290..9c3fe47d 100644 --- a/etc/gofer/plugins/package.conf +++ b/etc/gofer/plugins/package.conf @@ -17,7 +17,7 @@ # [pending] # # depth -# The pending queue depth. Default: 100K +# The pending queue depth. Default: 10K # # [messaging] # diff --git a/etc/gofer/plugins/system.conf b/etc/gofer/plugins/system.conf index 36cc8290..9c3fe47d 100644 --- a/etc/gofer/plugins/system.conf +++ b/etc/gofer/plugins/system.conf @@ -17,7 +17,7 @@ # [pending] # # depth -# The pending queue depth. Default: 100K +# The pending queue depth. Default: 10K # # [messaging] # diff --git a/etc/gofer/plugins/virt.conf b/etc/gofer/plugins/virt.conf index 7e5c7419..afb25d06 100644 --- a/etc/gofer/plugins/virt.conf +++ b/etc/gofer/plugins/virt.conf @@ -17,7 +17,7 @@ # [pending] # # depth -# The pending queue depth. Default: 100K +# The pending queue depth. Default: 10K # # [messaging] # diff --git a/src/gofer/agent/config.py b/src/gofer/agent/config.py index eb6d6311..48fb20a1 100644 --- a/src/gofer/agent/config.py +++ b/src/gofer/agent/config.py @@ -87,7 +87,7 @@ # [pending] # # depth -# The pending queue depth. Default: 100K +# The pending queue depth. Default: 10K # # [model] # @@ -166,7 +166,7 @@ 'heartbeat': '10' }, 'pending': { - 'depth': '100000' + 'depth': '10000' }, 'model': { 'managed': '2' diff --git a/src/gofer/rmi/store.py b/src/gofer/rmi/store.py index 539db9d8..ad3ab191 100644 --- a/src/gofer/rmi/store.py +++ b/src/gofer/rmi/store.py @@ -58,7 +58,7 @@ class Pending(object): PENDING = '/var/lib/%s/messaging/pending' % NAME # The queue depth - MAX_DEPTH = 100000 + MAX_DEPTH = 10000 # The soft threshold determines when the journal # file path is queued instead of the actual request