Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 232 additions & 0 deletions Common/Util/DynamicWorkerPool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

using System;
using System.Collections.Generic;
using System.Threading;

namespace QuantConnect.Util
{
/// <summary>
/// A worker pool that routes items into queues by key and processes each queue with its own thread.
/// It starts with a minimum number of workers and adds more on demand (up to a maximum) when a queue
/// starts to pile up. Items sharing a key go to the same queue, so they keep their relative order.
/// </summary>
/// <typeparam name="T">The item type being processed</typeparam>
public class DynamicWorkerPool<T> : IDisposable
{
private readonly Action<T> _handler;
private readonly Action<Exception> _onError;
private readonly string _threadName;
private readonly int _minWorkers;
private readonly int _maxWorkers;

private readonly BusyBlockingCollection<T>[] _queues;
private readonly List<Thread> _workers;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly object _lock = new object();
private int _activeWorkers;
private bool _disposed;

/// <summary>
/// The number of worker threads currently running
/// </summary>
public int WorkerCount => Volatile.Read(ref _activeWorkers);

/// <summary>
/// True while any queue still has items to process
/// </summary>
public bool IsBusy
{
get
{
if (Volatile.Read(ref _disposed))
{
return false;
}
for (var i = 0; i < _queues.Length; i++)
{
if (_queues[i].IsBusy)
{
return true;
}
}
return false;
}
}

/// <summary>
/// Initializes a new instance of the <see cref="DynamicWorkerPool{T}"/> class
/// </summary>
/// <param name="handler">The action invoked to process each item</param>
/// <param name="minWorkers">The number of worker threads to start with (at least 1)</param>
/// <param name="maxWorkers">The maximum number of worker threads the pool can grow to</param>
/// <param name="onError">Optional callback invoked when the handler throws an unexpected exception</param>
/// <param name="threadName">Optional name prefix used for the worker threads</param>
public DynamicWorkerPool(Action<T> handler, int minWorkers, int maxWorkers, Action<Exception> onError = null, string threadName = "DynamicWorkerPool")
{
_handler = handler ?? throw new ArgumentNullException(nameof(handler));
_maxWorkers = Math.Max(1, maxWorkers);
_minWorkers = Math.Min(Math.Max(1, minWorkers), _maxWorkers);
_onError = onError;
_threadName = threadName;

_queues = new BusyBlockingCollection<T>[_maxWorkers];
for (var i = 0; i < _maxWorkers; i++)
{
_queues[i] = new BusyBlockingCollection<T>();
}
_workers = new List<Thread>(_maxWorkers);
_cancellationTokenSource = new CancellationTokenSource();
}

/// <summary>
/// Starts the pool with the minimum number of worker threads. Idempotent.
/// </summary>
public void Start()
{
lock (_lock)
{
if (_workers.Count > 0)
{
return;
}
for (var i = 0; i < _minWorkers; i++)
{
StartWorker(i);
}
_activeWorkers = _minWorkers;
}
}

/// <summary>
/// Routes an item to a queue by <paramref name="key"/> and adds a worker if that queue is piling up
/// </summary>
/// <param name="key">The routing key; the same key maps to the same queue while the pool size is stable</param>
/// <param name="item">The item to process</param>
public void Enqueue(long key, T item)
{
var active = Volatile.Read(ref _activeWorkers);
var index = (int)(key % active);
if (index < 0)
{
index += active;
}

var queue = _queues[index];
queue.Add(item);

// the queue is piling up faster than its worker can process it: grow the pool
if (active < _maxWorkers && queue.Count > 1)
{
Grow();
}
}

/// <summary>
/// Waits until all queues are empty and idle, or the timeout elapses
/// </summary>
/// <returns>True if the pool became idle, false on timeout</returns>
public bool WaitForIdle(TimeSpan timeout)
{
var deadline = DateTime.UtcNow + timeout;
while (IsBusy)
{
if (DateTime.UtcNow >= deadline)
{
return false;
}
Thread.Sleep(1);
}
return true;
}

/// <summary>
/// Stops the pool, signaling the workers to exit and waiting for them to finish
/// </summary>
public void Dispose()
{
lock (_lock)
{
if (_disposed)
{
return;
}
_disposed = true;
}

_cancellationTokenSource.Cancel();
foreach (var queue in _queues)
{
queue.CompleteAdding();
}

lock (_lock)
{
foreach (var worker in _workers)
{
worker?.StopSafely(TimeSpan.FromSeconds(5), _cancellationTokenSource);
}
}

foreach (var queue in _queues)
{
queue.DisposeSafely();
}
_cancellationTokenSource.DisposeSafely();
}

private void Grow()
{
lock (_lock)
{
if (_activeWorkers >= _maxWorkers || _cancellationTokenSource.IsCancellationRequested)
{
return;
}
StartWorker(_activeWorkers);
// publish the new worker only after it has started so routing never targets a missing queue
_activeWorkers++;
}
}

private void StartWorker(int index)
{
var worker = new Thread(() => WorkerLoop(index)) { IsBackground = true, Name = $"{_threadName} {index}" };
_workers.Add(worker);
worker.Start();
}

private void WorkerLoop(int index)
{
try
{
foreach (var item in _queues[index].GetConsumingEnumerable(_cancellationTokenSource.Token))
{
_handler(item);
}
}
catch (OperationCanceledException)
{
// shutting down
}
catch (Exception err)
{
_onError?.Invoke(err);
}
}
}
}
102 changes: 63 additions & 39 deletions Engine/TransactionHandlers/BrokerageTransactionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ public class BrokerageTransactionHandler : ITransactionHandler
/// </summary>
protected List<IBusyCollection<OrderRequest>> _orderRequestQueues { get; set; }

private List<Thread> _processingThreads;
// Worker pool for concurrent order processing, routed by OrderId, growing on demand. Null in the synchronous backtest path.
private DynamicWorkerPool<OrderRequest> _pool;

private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();

private readonly ConcurrentQueue<OrderEvent> _orderEvents = new ConcurrentQueue<OrderEvent>();
Expand Down Expand Up @@ -236,24 +238,45 @@ public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResu
/// </summary>
protected virtual void InitializeTransactionThread()
{
// multi threaded queue, used for live deployments
var processingThreadsCount = _brokerage.ConcurrencyEnabled
? Config.GetInt("maximum-transaction-threads", 4)
: 1;
_orderRequestQueues = new(processingThreadsCount);
_processingThreads = new(processingThreadsCount);
for (var i = 0; i < processingThreadsCount; i++)
{
_orderRequestQueues.Add(new BusyBlockingCollection<OrderRequest>());
var threadId = i; // avoid modified closure
_processingThreads.Add(new Thread(() => Run(threadId)) { IsBackground = true, Name = $"Transaction Thread {i}" });
}
foreach (var thread in _processingThreads)
{
thread.Start();
}
// The pool starts with the minimum number of workers and grows up to the maximum on demand.
// Requests are routed by OrderId, and each order is processed by a single worker at a time,
// which preserves the Submit/Update/Cancel ordering per OrderId even as the pool scales.
var maxThreads = _brokerage.ConcurrencyEnabled ? Math.Max(1, MaximumTransactionThreads) : 1;
var minThreads = _brokerage.ConcurrencyEnabled ? Math.Min(Math.Max(1, MinimumTransactionThreads), maxThreads) : 1;

_pool = new DynamicWorkerPool<OrderRequest>(
request =>
{
HandleOrderRequest(request);
ProcessAsynchronousEvents();
},
minThreads,
maxThreads,
onError: err =>
{
// unexpected error, we need to close down shop
_algorithm.SetRuntimeError(err, "HandleOrderRequest");
IsActive = false;
},
threadName: "Transaction Thread");
_pool.Start();
}

/// <summary>
/// The maximum number of worker threads the dynamic transaction thread pool can grow to
/// </summary>
protected virtual int MaximumTransactionThreads => Config.GetInt("maximum-transaction-threads", 10);

/// <summary>
/// The number of worker threads the dynamic transaction thread pool starts with
/// </summary>
protected virtual int MinimumTransactionThreads => Config.GetInt("minimum-transaction-threads", 2);

/// <summary>
/// The number of worker threads currently running in the dynamic transaction thread pool
/// </summary>
protected int ProcessingThreadsCount => _pool?.WorkerCount ?? 0;

/// <summary>
/// Boolean flag indicating the Run thread method is busy.
/// False indicates it is completely finished processing and ready to be terminated.
Expand Down Expand Up @@ -674,7 +697,8 @@ public List<Order> GetOpenOrders(Func<Order, bool> filter = null)
}

/// <summary>
/// Primary thread entry point to launch the transaction thread.
/// Processes the order request queue synchronously. Used by the backtesting transaction handler,
/// which processes order requests on the algorithm thread instead of using the worker pool.
/// </summary>
protected void Run(int threadId)
{
Expand All @@ -691,12 +715,6 @@ protected void Run(int threadId)
// unexpected error, we need to close down shop
_algorithm.SetRuntimeError(err, "HandleOrderRequest");
}

if (_processingThreads != null)
{
Log.Trace($"BrokerageTransactionHandler.Run(): Ending Thread {threadId}...");
IsActive = false;
}
}

/// <summary>
Expand All @@ -717,7 +735,11 @@ public virtual void ProcessSynchronousEvents()
// in backtesting we need to wait for orders to be removed from the queue and finished processing
if (!_algorithm.LiveMode)
{
if (_orderRequestQueues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(Time.OneSecond, _cancellationTokenSource.Token)))
if (_orderRequestQueues != null && _orderRequestQueues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(Time.OneSecond, _cancellationTokenSource.Token)))
{
Log.Error("BrokerageTransactionHandler.ProcessSynchronousEvents(): Timed out waiting for request queue to finish processing.");
}
else if (_pool != null && !_pool.WaitForIdle(Time.OneSecond))
{
Log.Error("BrokerageTransactionHandler.ProcessSynchronousEvents(): Timed out waiting for request queue to finish processing.");
}
Expand Down Expand Up @@ -800,23 +822,14 @@ public void AddOpenOrder(Order order, IAlgorithm algorithm)
public void Exit()
{
var timeout = TimeSpan.FromSeconds(60);
if (_processingThreads != null)
if (_pool != null)
{
// only wait if the processing thread is running
if (_orderRequestQueues.Any(queue => queue.IsBusy && !queue.WaitHandle.WaitOne(timeout)))
// wait for the pool to finish processing pending requests, then stop the workers
if (!_pool.WaitForIdle(timeout))
{
Log.Error("BrokerageTransactionHandler.Exit(): Exceed timeout: " + (int)(timeout.TotalSeconds) + " seconds.");
}

foreach (var queue in _orderRequestQueues)
{
queue.CompleteAdding();
}

foreach (var thread in _processingThreads)
{
thread?.StopSafely(timeout, _cancellationTokenSource);
}
_pool.DisposeSafely();
}
IsActive = false;
_cancellationTokenSource.DisposeSafely();
Expand Down Expand Up @@ -1939,12 +1952,23 @@ private string GetShortableErrorMessage(Symbol symbol, decimal quantity)

private void EnqueueOrderRequest(OrderRequest request, Order order)
{
// route by OrderId (or combo group id) so all requests for the same order are processed
// in order by a single worker; the pool keeps the routing stable as it scales
var queueKey = request.OrderId;
if (order.GroupOrderManager?.Id > 0)
{
queueKey = order.GroupOrderManager.Id;
}
_orderRequestQueues[queueKey % _orderRequestQueues.Count].Add(request);

if (_pool != null)
{
_pool.Enqueue(queueKey, request);
}
else
{
// synchronous backtest path: a single queue processed on the algorithm thread
_orderRequestQueues[(int)(queueKey % _orderRequestQueues.Count)].Add(request);
}
}

/// <summary>
Expand Down
Loading
Loading