diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 4ea4f99868..6cfe04800e 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -52,6 +52,7 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/expand_topic_or_service_name.cpp src/rclcpp/executors/events_executor.cpp src/rclcpp/executors/events_executor_entities_collector.cpp + src/rclcpp/executors/events_queue.cpp src/rclcpp/executors/multi_threaded_executor.cpp src/rclcpp/executors/single_threaded_executor.cpp src/rclcpp/executors/static_executor_entities_collector.cpp diff --git a/rclcpp/include/rclcpp/executor_options.hpp b/rclcpp/include/rclcpp/executor_options.hpp index d5ec6d0da9..944dd928bf 100644 --- a/rclcpp/include/rclcpp/executor_options.hpp +++ b/rclcpp/include/rclcpp/executor_options.hpp @@ -24,18 +24,37 @@ namespace rclcpp { +enum class QueueStrategy +{ + CPU_PERFORMANCE, + LIMITED_EVENTS_WITH_TIME_ORDERING, + BOUNDED +}; + +struct QueueOptions +{ + QueueOptions() + : queue_strategy(QueueStrategy::CPU_PERFORMANCE), + max_events(1000) + {} + + QueueStrategy queue_strategy; + size_t max_events; +}; + /// Options to be passed to the executor constructor. struct ExecutorOptions { ExecutorOptions() : memory_strategy(rclcpp::memory_strategies::create_default_strategy()), context(rclcpp::contexts::get_global_default_context()), - max_conditions(0) + max_conditions(0), queue_options(QueueOptions()) {} rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy; rclcpp::Context::SharedPtr context; size_t max_conditions; + QueueOptions queue_options; }; namespace executor diff --git a/rclcpp/include/rclcpp/executors/events_executor.hpp b/rclcpp/include/rclcpp/executors/events_executor.hpp index df63a96535..02ce234721 100644 --- a/rclcpp/include/rclcpp/executors/events_executor.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor.hpp @@ -1,4 +1,4 @@ -// Copyright 2020 Open Source Robotics Foundation, Inc. +// Copyright 2021 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,18 +16,17 @@ #define RCLCPP__EXECUTORS__EVENTS_EXECUTOR_HPP_ #include +#include #include -#include #include #include "rclcpp/executor.hpp" #include "rclcpp/executors/events_executor_entities_collector.hpp" #include "rclcpp/executors/events_executor_notify_waitable.hpp" +#include "rclcpp/executors/events_queue.hpp" #include "rclcpp/executors/timers_manager.hpp" #include "rclcpp/node.hpp" -#include "rmw/listener_event_types.h" - namespace rclcpp { namespace executors @@ -174,7 +173,7 @@ class EventsExecutor : public rclcpp::Executor private: RCLCPP_DISABLE_COPY(EventsExecutor) - using EventQueue = std::queue; + using EventQueue = std::deque; // Executor callback: Push new events into the queue and trigger cv. // This function is called by the DDS entities when an event happened, @@ -186,14 +185,8 @@ class EventsExecutor : public rclcpp::Executor auto this_executor = const_cast( static_cast(executor_ptr)); - // Event queue mutex scope - { - std::unique_lock lock(this_executor->push_mutex_); - - this_executor->event_queue_.push(event); - } - // Notify that the event queue has some events in it. - this_executor->event_queue_cv_.notify_one(); + // Push event and notify the queue it has some events + this_executor->events_queue_->push_and_notify(event); } /// Extract and execute events from the queue until it is empty @@ -207,15 +200,11 @@ class EventsExecutor : public rclcpp::Executor execute_event(const rmw_listener_event_t & event); // Queue where entities can push events - EventQueue event_queue_; + EventsQueue::SharedPtr events_queue_; EventsExecutorEntitiesCollector::SharedPtr entities_collector_; EventsExecutorNotifyWaitable::SharedPtr executor_notifier_; - // Mutex to protect the insertion of events in the queue - std::mutex push_mutex_; - // Variable used to notify when an event is added to the queue - std::condition_variable event_queue_cv_; // Timers manager std::shared_ptr timers_manager_; }; diff --git a/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp b/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp index 7a07d61375..0315e4efa4 100644 --- a/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor_entities_collector.hpp @@ -184,6 +184,39 @@ class EventsExecutorEntitiesCollector final rclcpp::Waitable::SharedPtr get_waitable(const void * waitable_id); + /// + /** + * Get the subscription qos depth corresponding + * to a subscription identifier + */ + RCLCPP_PUBLIC + size_t + get_subscription_qos_depth(const void * subscription_id); + + /// + /** + * Get the waitable qos depth corresponding + * to a waitable identifier + */ + RCLCPP_PUBLIC + size_t + get_waitable_qos_depth(const void * waitable_id); + + /// + /** + * Gets the QoS of the entities collector. + * This is useful for the events executor, when it has to + * decide if keep pushing events from this waitable into the qeueue + */ + RCLCPP_PUBLIC + rmw_qos_profile_t + get_actual_qos() const + { + rmw_qos_profile_t qos; + qos.depth = 0; + return qos; + } + /// /** * Add a weak pointer to a waitable @@ -258,6 +291,11 @@ class EventsExecutorEntitiesCollector final std::unordered_map weak_clients_map_; std::unordered_map weak_waitables_map_; + // Maps: entity identifiers to qos->depth from the entities registered in the executor + using QosDepthMap = std::unordered_map; + QosDepthMap qos_depth_subscriptions_map_; + QosDepthMap qos_depth_waitables_map_; + /// Executor using this entities collector object EventsExecutor * associated_executor_ = nullptr; /// Instance of the timers manager used by the associated executor diff --git a/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp b/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp index ee7ed4a6dc..76a6f81e03 100644 --- a/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp +++ b/rclcpp/include/rclcpp/executors/events_executor_notify_waitable.hpp @@ -86,6 +86,21 @@ class EventsExecutorNotifyWaitable final : public EventWaitable return nullptr; } + /// + /** + * Gets the QoS of this notify waitable. + * This is useful for the events executor, when it has to + * decide if keep pushing events from this waitable into the qeueue + */ + RCLCPP_PUBLIC + rmw_qos_profile_t + get_actual_qos() const + { + rmw_qos_profile_t qos; + qos.depth = 0; + return qos; + } + private: std::list notify_guard_conditions_; }; diff --git a/rclcpp/include/rclcpp/executors/events_queue.hpp b/rclcpp/include/rclcpp/executors/events_queue.hpp new file mode 100644 index 0000000000..d1df52cd49 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/events_queue.hpp @@ -0,0 +1,202 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__EXECUTORS__EVENTS_QUEUE_HPP_ +#define RCLCPP__EXECUTORS__EVENTS_QUEUE_HPP_ + +#include +#include +#include +#include +#include +#include + +#include "rclcpp/executor_options.hpp" +#include "rclcpp/executors/events_executor_entities_collector.hpp" +#include "rclcpp/macros.hpp" + +#include "rmw/listener_event_types.h" + +namespace rclcpp +{ +namespace executors +{ + +/** + * @brief This class provides a queue implementation which supports + * different strategies for queueing and pruning events in case of + * queue overruns. + * + */ +class EventsQueue +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(EventsQueue) + + using EventQueue = std::deque; + + /** + * @brief Construct a new EventsQueue object + * @param collector The entities collector associated to this queue + * @param options The event queue options + */ + EventsQueue( + EventsExecutorEntitiesCollector::SharedPtr collector, + QueueOptions options); + + /** + * @brief Destruct the object. + */ + ~EventsQueue(); + + /** + * @brief swap EventQueue with another + * @param event_queue The queue to swap with member queue + */ + void swap(EventQueue & event_queue); + + /** + * @brief Waits for an event to happen or timeout + * @param timeout Time to wait and exit if no events received + */ + void wait_for_event(std::chrono::nanoseconds timeout); + + /** + * @brief Waits for an event and swap queues + * @param event_queue The queue where the events will be swapped to + */ + void wait_for_event_and_swap(EventQueue & event_queue); + + /** + * @brief Waits for an event or timeout and swap queues + * @param event_queue The queue where the events will be swapped to + * @param timeout Time to wait and swap if no events received + */ + void wait_for_event_and_swap( + EventQueue & event_queue, + std::chrono::nanoseconds timeout); + + /** + * @brief Waits for an event or timeout and get first (oldest) event + * @param event Where the event will be stored + * @param timeout Time to wait and exit if no events received + * @return true if there was an event, false if timeout time elapsed + */ + bool wait_and_get_first_event( + rmw_listener_event_t & event, + std::chrono::nanoseconds timeout); + + /** + * @brief push event into the queue and trigger cv + * @param event The event to push into the queue + */ + void push_and_notify(rmw_listener_event_t event); + +private: + RCLCPP_DISABLE_COPY(EventsQueue) + + // Function pointers to implement different queue strategies + using PushFunctionPtr = std::function; + using ClearStatsFunctionPtr = std::function; + + PushFunctionPtr push_and_notify_implem_; + ClearStatsFunctionPtr clear_stats_implem_; + + // + // QueueStrategy::CPU_PERFORMANCE + // + + /** + * @brief push event into the queue and trigger cv + * @param event The event to push into the queue + */ + void simple_push(rmw_listener_event_t event); + + // + // QueueStrategy::LIMITED_EVENTS_WITH_TIME_ORDERING + // + + /** + * @brief Follows policy in how to push to the queue + * Before pushing, counts how many events came from the entity + * and compares it with its QoS depth. It removes the old and + * adds a new event if one has expired, se we keep time ordering + * @param event The event to push into the queue + */ + void limited_events_push(rmw_listener_event_t event); + + /** + * @brief Remove oldest event and pushes new one in the back + * @param event The event to remove and push back into the queue + */ + void remove_oldest_event_and_push_back_new(rmw_listener_event_t event); + + /** + * @brief Informs if the amount of subscriptions events reached the limit + * @param subscription_id The subscription_id to obtain information + * @return true if reached limit + */ + bool subscription_events_reached_limit(const void * subscription_id); + + /** + * @brief Informs if the amount of waitable events reached the limit + * @param waitable_id The waitable_id to obtain information + * @return true if reached limit + */ + bool waitable_events_reached_limit(const void * waitable_id); + + /** + * @brief Clears events queue stats + */ + void clear_stats(); + + // + // QueueStrategy::BOUNDED + // + + /** + * @brief push event into the queue and trigger cv + * @param event The event to push into the queue + */ + void bounded_push(rmw_listener_event_t event); + + /** + * @brief prune mechanism for qos bounded queue + */ + void bounded_prune(); + + size_t queue_limit_; + + // The underlying queue + EventQueue event_queue_; + + // Mutex to protect the insertion of events in the queue + std::mutex push_mutex_; + + // Variable used to notify when an event is added to the queue + std::condition_variable event_queue_cv_; + + // Entities collector associated with executor and events queue + EventsExecutorEntitiesCollector::SharedPtr entities_collector_; + + // Maps: entity identifiers to number of events in the queue + using EventsInQueueMap = std::unordered_map; + EventsInQueueMap subscription_events_in_queue_map_; + EventsInQueueMap waitable_events_in_queue_map_; +}; + +} // namespace executors +} // namespace rclcpp + +#endif // RCLCPP__EXECUTORS__EVENTS_QUEUE_HPP_ diff --git a/rclcpp/include/rclcpp/waitable.hpp b/rclcpp/include/rclcpp/waitable.hpp index e4d61f1732..babc6aec51 100644 --- a/rclcpp/include/rclcpp/waitable.hpp +++ b/rclcpp/include/rclcpp/waitable.hpp @@ -215,6 +215,11 @@ class Waitable const rclcpp::executors::EventsExecutor * executor, rmw_listener_cb_t executor_callback) const; + RCLCPP_PUBLIC + virtual + rmw_qos_profile_t + get_actual_qos() const; + private: std::atomic in_use_by_wait_set_{false}; }; // class Waitable diff --git a/rclcpp/src/rclcpp/executors/events_executor.cpp b/rclcpp/src/rclcpp/executors/events_executor.cpp index 97a673c5cf..5ff5700f11 100644 --- a/rclcpp/src/rclcpp/executors/events_executor.cpp +++ b/rclcpp/src/rclcpp/executors/events_executor.cpp @@ -1,4 +1,4 @@ -// Copyright 2020 Open Source Robotics Foundation, Inc. +// Copyright 2021 Open Source Robotics Foundation, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,12 +13,13 @@ // limitations under the License. #include -#include +#include #include #include #include #include "rclcpp/exceptions/exceptions.hpp" +#include "rclcpp/executor_options.hpp" #include "rclcpp/executors/events_executor.hpp" using namespace std::chrono_literals; @@ -33,6 +34,10 @@ EventsExecutor::EventsExecutor( entities_collector_ = std::make_shared(this); entities_collector_->init(); + events_queue_ = std::make_shared( + entities_collector_, + options.queue_options); + // Setup the executor notifier to wake up the executor when some guard conditions are tiggered. // The added guard conditions are guaranteed to not go out of scope before the executor itself. executor_notifier_ = std::make_shared(); @@ -50,22 +55,13 @@ EventsExecutor::spin() } RCLCPP_SCOPE_EXIT(this->spinning.store(false);); - // When condition variable is notified, check this predicate to proceed - auto has_event_predicate = [this]() {return !event_queue_.empty();}; + timers_manager_->start(); // Local event queue to allow entities to push events while we execute them EventQueue execution_event_queue; - timers_manager_->start(); - while (rclcpp::ok(context_) && spinning.load()) { - std::unique_lock push_lock(push_mutex_); - // We wait here until something has been pushed to the event queue - event_queue_cv_.wait(push_lock, has_event_predicate); - // We got an event! Swap queues - std::swap(execution_event_queue, event_queue_); - // After swapping the queues, we don't need the lock anymore - push_lock.unlock(); + events_queue_->wait_for_event_and_swap(execution_event_queue); // Consume all available events, this queue will be empty at the end of the function this->consume_all_events(execution_event_queue); } @@ -90,25 +86,15 @@ EventsExecutor::spin_some(std::chrono::nanoseconds max_duration) // - A timer triggers // - An executor event is received and processed - // When condition variable is notified, check this predicate to proceed - auto has_event_predicate = [this]() {return !event_queue_.empty();}; - - // Local event queue to allow entities to push events while we execute them - EventQueue execution_event_queue; - // Select the smallest between input max_duration and timer timeout auto next_timer_timeout = timers_manager_->get_head_timeout(); if (next_timer_timeout < max_duration) { max_duration = next_timer_timeout; } - std::unique_lock push_lock(push_mutex_); - // Wait until timeout or event - event_queue_cv_.wait_for(push_lock, max_duration, has_event_predicate); - // Time to swap queues as the wait is over - std::swap(execution_event_queue, event_queue_); - // After swapping the queues, we don't need the lock anymore - push_lock.unlock(); + // Local event queue to allow entities to push events while we execute them + EventQueue execution_event_queue; + events_queue_->wait_for_event_and_swap(execution_event_queue, max_duration); // Execute all ready timers timers_manager_->execute_ready_timers(); @@ -124,16 +110,10 @@ EventsExecutor::spin_all(std::chrono::nanoseconds max_duration) } if (spinning.exchange(true)) { - throw std::runtime_error("spin_some() called while already spinning"); + throw std::runtime_error("spin_all() called while already spinning"); } RCLCPP_SCOPE_EXIT(this->spinning.store(false);); - // When condition variable is notified, check this predicate to proceed - auto has_event_predicate = [this]() {return !event_queue_.empty();}; - - // Local event queue to allow entities to push events while we execute them - EventQueue execution_event_queue; - auto start = std::chrono::steady_clock::now(); auto max_duration_not_elapsed = [max_duration, start]() { auto elapsed_time = std::chrono::steady_clock::now() - start; @@ -146,19 +126,17 @@ EventsExecutor::spin_all(std::chrono::nanoseconds max_duration) max_duration = next_timer_timeout; } - { - // Wait once until timeout or event - std::unique_lock push_lock(push_mutex_); - event_queue_cv_.wait_for(push_lock, max_duration, has_event_predicate); - } + // Wait once until timeout or event + events_queue_->wait_for_event(max_duration); auto timeout = timers_manager_->get_head_timeout(); + // Local event queue to allow entities to push events while we execute them + EventQueue execution_event_queue; + // Keep executing until no more work to do or timeout expired while (rclcpp::ok(context_) && spinning.load() && max_duration_not_elapsed()) { - std::unique_lock push_lock(push_mutex_); - std::swap(execution_event_queue, event_queue_); - push_lock.unlock(); + events_queue_->swap(execution_event_queue); // Exit if there is no more work to do const bool ready_timer = timeout < 0ns; @@ -187,24 +165,9 @@ EventsExecutor::spin_once_impl(std::chrono::nanoseconds timeout) timeout = next_timer_timeout; } - // When condition variable is notified, check this predicate to proceed - auto has_event_predicate = [this]() {return !event_queue_.empty();}; - rmw_listener_event_t event; - bool has_event = false; - - { - // Wait until timeout or event arrives - std::unique_lock lock(push_mutex_); - event_queue_cv_.wait_for(lock, timeout, has_event_predicate); - - // Grab first event from queue if it exists - has_event = !event_queue_.empty(); - if (has_event) { - event = event_queue_.front(); - event_queue_.pop(); - } - } + + auto has_event = events_queue_->wait_and_get_first_event(event, timeout); // If we wake up from the wait with an event, it means that it // arrived before any of the timers expired. @@ -257,7 +220,7 @@ EventsExecutor::consume_all_events(EventQueue & event_queue) { while (!event_queue.empty()) { rmw_listener_event_t event = event_queue.front(); - event_queue.pop(); + event_queue.pop_front(); this->execute_event(event); } diff --git a/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp b/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp index c2b08a9de6..d3aaafcb44 100644 --- a/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp +++ b/rclcpp/src/rclcpp/executors/events_executor_entities_collector.cpp @@ -77,6 +77,8 @@ EventsExecutorEntitiesCollector::~EventsExecutorEntitiesCollector() weak_services_map_.clear(); weak_waitables_map_.clear(); weak_subscriptions_map_.clear(); + qos_depth_waitables_map_.clear(); + qos_depth_subscriptions_map_.clear(); weak_nodes_to_guard_conditions_.clear(); weak_groups_associated_with_executor_to_nodes_.clear(); weak_groups_to_nodes_associated_with_executor_.clear(); @@ -234,9 +236,13 @@ EventsExecutorEntitiesCollector::set_callback_group_entities_callbacks( group->find_subscription_ptrs_if( [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { if (subscription) { + qos_depth_subscriptions_map_.emplace( + subscription.get(), subscription->get_actual_qos().depth()); + subscription->set_events_executor_callback( associated_executor_, &EventsExecutor::push_event); + weak_subscriptions_map_.emplace(subscription.get(), subscription); } return false; @@ -264,9 +270,13 @@ EventsExecutorEntitiesCollector::set_callback_group_entities_callbacks( group->find_waitable_ptrs_if( [this](const rclcpp::Waitable::SharedPtr & waitable) { if (waitable) { + qos_depth_waitables_map_.emplace( + waitable.get(), waitable->get_actual_qos().depth); + waitable->set_events_executor_callback( associated_executor_, &EventsExecutor::push_event); + weak_waitables_map_.emplace(waitable.get(), waitable); } return false; @@ -292,6 +302,7 @@ EventsExecutorEntitiesCollector::unset_callback_group_entities_callbacks( if (subscription) { subscription->set_events_executor_callback(nullptr, nullptr); weak_subscriptions_map_.erase(subscription.get()); + qos_depth_subscriptions_map_.erase(subscription.get()); } return false; }); @@ -316,6 +327,7 @@ EventsExecutorEntitiesCollector::unset_callback_group_entities_callbacks( if (waitable) { waitable->set_events_executor_callback(nullptr, nullptr); weak_waitables_map_.erase(waitable.get()); + qos_depth_waitables_map_.erase(waitable.get()); } return false; }); @@ -520,6 +532,7 @@ EventsExecutorEntitiesCollector::get_subscription(const void * subscription_id) // The subscription expired, remove from map weak_subscriptions_map_.erase(it); + qos_depth_subscriptions_map_.erase(subscription_id); } return nullptr; } @@ -577,6 +590,7 @@ EventsExecutorEntitiesCollector::get_waitable(const void * waitable_id) // The waitable expired, remove from map weak_waitables_map_.erase(it); + qos_depth_waitables_map_.erase(waitable_id); } return nullptr; } @@ -584,5 +598,34 @@ EventsExecutorEntitiesCollector::get_waitable(const void * waitable_id) void EventsExecutorEntitiesCollector::add_waitable(rclcpp::Waitable::SharedPtr waitable) { + qos_depth_waitables_map_.emplace(waitable.get(), waitable->get_actual_qos().depth); weak_waitables_map_.emplace(waitable.get(), waitable); } + +size_t +EventsExecutorEntitiesCollector::get_subscription_qos_depth(const void * subscription_id) +{ + auto it = qos_depth_subscriptions_map_.find(subscription_id); + + if (it != qos_depth_subscriptions_map_.end()) { + size_t qos_depth = it->second; + return qos_depth; + } + + // If the subscription_id is not present in the map, throw error + throw std::runtime_error("Event from subscription not registered in map!"); +} + +size_t +EventsExecutorEntitiesCollector::get_waitable_qos_depth(const void * waitable_id) +{ + auto it = qos_depth_waitables_map_.find(waitable_id); + + if (it != qos_depth_waitables_map_.end()) { + size_t qos_depth = it->second; + return qos_depth; + } + + // If the waitable_id is not present in the map, throw error + throw std::runtime_error("Event from waitable not registered in map!"); +} diff --git a/rclcpp/src/rclcpp/executors/events_queue.cpp b/rclcpp/src/rclcpp/executors/events_queue.cpp new file mode 100644 index 0000000000..3ce2856708 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/events_queue.cpp @@ -0,0 +1,350 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include "rclcpp/executors/events_queue.hpp" + +using rclcpp::executors::EventsQueue; +using namespace std::placeholders; + +EventsQueue::EventsQueue( + EventsExecutorEntitiesCollector::SharedPtr entities_collector, + QueueOptions queue_options) +: entities_collector_(entities_collector) +{ + switch (queue_options.queue_strategy) { + case QueueStrategy::CPU_PERFORMANCE: + { + push_and_notify_implem_ = std::bind( + &EventsQueue::simple_push, this, std::placeholders::_1); + clear_stats_implem_ = []() {}; + break; + } + + case QueueStrategy::LIMITED_EVENTS_WITH_TIME_ORDERING: + { + push_and_notify_implem_ = std::bind( + &EventsQueue::limited_events_push, this, std::placeholders::_1); + clear_stats_implem_ = std::bind(&EventsQueue::clear_stats, this); + break; + } + + case QueueStrategy::BOUNDED: + { + push_and_notify_implem_ = std::bind( + &EventsQueue::bounded_push, this, std::placeholders::_1); + clear_stats_implem_ = []() {}; + queue_limit_ = queue_options.max_events; + break; + } + } +} + +EventsQueue::~EventsQueue() +{ + subscription_events_in_queue_map_.clear(); + waitable_events_in_queue_map_.clear(); +} + +void EventsQueue::swap(EventQueue & event_queue) +{ + std::unique_lock push_lock(push_mutex_); + std::swap(event_queue, event_queue_); + clear_stats_implem_(); +} + +void EventsQueue::wait_for_event(std::chrono::nanoseconds timeout) +{ + // When condition variable is notified, check this predicate to proceed + auto has_event_predicate = [this]() {return !event_queue_.empty();}; + + std::unique_lock push_lock(push_mutex_); + event_queue_cv_.wait_for(push_lock, timeout, has_event_predicate); +} + +void EventsQueue::wait_for_event_and_swap(EventQueue & event_queue) +{ + // When condition variable is notified, check this predicate to proceed + auto has_event_predicate = [this]() {return !event_queue_.empty();}; + + std::unique_lock push_lock(push_mutex_); + // We wait here until something has been pushed to the event queue + event_queue_cv_.wait(push_lock, has_event_predicate); + // We got an event! Swap queues and clear maps + std::swap(event_queue, event_queue_); + clear_stats_implem_(); +} + +void EventsQueue::wait_for_event_and_swap( + EventQueue & event_queue, + std::chrono::nanoseconds timeout) +{ + // When condition variable is notified, check this predicate to proceed + auto has_event_predicate = [this]() {return !event_queue_.empty();}; + + std::unique_lock push_lock(push_mutex_); + // We wait here until something has been pushed to the queue + // or if the timeout expired + event_queue_cv_.wait_for(push_lock, timeout, has_event_predicate); + // Time to swap queues as the wait is over + std::swap(event_queue, event_queue_); + clear_stats_implem_(); +} + +bool EventsQueue::wait_and_get_first_event( + rmw_listener_event_t & event, + std::chrono::nanoseconds timeout) +{ + // When condition variable is notified, check this predicate to proceed + auto has_event_predicate = [this]() {return !event_queue_.empty();}; + + // Wait until timeout or event arrives + std::unique_lock lock(push_mutex_); + + event_queue_cv_.wait_for(lock, timeout, has_event_predicate); + + // Grab first event (oldest) from queue if it exists + bool has_event = !event_queue_.empty(); + + if (has_event) { + event = event_queue_.front(); + event_queue_.pop_front(); + return true; + } + + // If the timeout expired without new events, return false + return false; +} + +// +// Queue Policies/Strategies implementations +// + +void EventsQueue::push_and_notify(rmw_listener_event_t event) +{ + // This function depends on the queue policy selected + push_and_notify_implem_(event); + + // Notify that we have an event in the queue + event_queue_cv_.notify_one(); +} + +// +// QueueStrategy::CPU_PERFORMANCE +// + +void EventsQueue::simple_push(rmw_listener_event_t event) +{ + std::unique_lock lock(push_mutex_); + event_queue_.push_back(event); +} + +// +// QueueStrategy::LIMITED_EVENTS_WITH_TIME_ORDERING +// + +void EventsQueue::limited_events_push(rmw_listener_event_t event) +{ + std::unique_lock lock(push_mutex_); + + switch (event.type) { + case SUBSCRIPTION_EVENT: + { + bool limit_reached = subscription_events_reached_limit(event.entity); + + if (limit_reached) { + remove_oldest_event_and_push_back_new(event); + } else { + event_queue_.push_back(event); + break; + } + } + + case SERVICE_EVENT: + case CLIENT_EVENT: + { + break; + } + + case WAITABLE_EVENT: + { + bool limit_reached = waitable_events_reached_limit(event.entity); + + if (limit_reached) { + remove_oldest_event_and_push_back_new(event); + } else { + event_queue_.push_back(event); + break; + } + } + } +} + +bool EventsQueue::subscription_events_reached_limit(const void * subscription_id) +{ + auto limit = entities_collector_->get_subscription_qos_depth(subscription_id); + + // If there's no limit, return false + if (!limit) { + return false; + } + + auto it = subscription_events_in_queue_map_.find(subscription_id); + + if (it != subscription_events_in_queue_map_.end()) { + size_t & subscription_events_in_queue = it->second; + + if (subscription_events_in_queue < limit) { + // Add one event as we still didn't reach the limit + subscription_events_in_queue++; + return false; + } else { + return true; + } + } + + // If the subscription_id is not present in the map, add it with one event in the counter + subscription_events_in_queue_map_.emplace(subscription_id, 1); + return false; +} + +bool EventsQueue::waitable_events_reached_limit(const void * waitable_id) +{ + auto limit = entities_collector_->get_waitable_qos_depth(waitable_id); + + // If there's no limit, return false + if (!limit) { + return false; + } + + auto it = waitable_events_in_queue_map_.find(waitable_id); + + if (it != waitable_events_in_queue_map_.end()) { + size_t & waitable_events_in_queue = it->second; + + if (waitable_events_in_queue < limit) { + // Add one event as we still didn't reach the limit + waitable_events_in_queue++; + return false; + } else { + return true; + } + } + + // If the waitable_id is not present in the map, add it with one event in the counter + waitable_events_in_queue_map_.emplace(waitable_id, 1); + return false; +} + +void EventsQueue::remove_oldest_event_and_push_back_new(rmw_listener_event_t event) +{ + // Remove first event (oldest) associated with this entity from the queue + EventQueue::iterator it; + + for (it = event_queue_.begin(); it != event_queue_.end(); it++) { + if ((*it).entity == event.entity) { + event_queue_.erase(it); + break; + } + } + + // Push event to the back of the queue (where newest messages are located) + event_queue_.push_back(event); +} + +void EventsQueue::clear_stats() +{ + subscription_events_in_queue_map_.clear(); + waitable_events_in_queue_map_.clear(); +} + +// +// QueueStrategy::BOUNDED +// + +void EventsQueue::bounded_push(rmw_listener_event_t event) +{ + std::unique_lock lock(push_mutex_); + + if (event_queue_.size() >= queue_limit_) { + bounded_prune(); + } + event_queue_.push_back(event); +} + +void EventsQueue::bounded_prune() +{ + // Start with fresh stats + clear_stats(); + + // Prune queue: + // For each entity, we get its QoS depth and use it as its events limit. + // Starting from the newer events (backward iterating the queue) we + // count events from each entity. If there are more events than the limit, + // we discard the oldest events. The + // For example, subscription A has depth = 1 / B has depth = 2 + // C has depth = 1 / D has depth = 1 + // If the queue is: + // Older Events (front of the queue) + // | D | + // | A | -> Should be removed, the msg has expired and overriden. + // | A | -> Should be removed + // | B | -> Should be removed | D | + // | C | | C | + // | B | | B | + // | A | ---> | A | + // | B | | B | + // Newer Events After pruning + // + EventQueue::reverse_iterator rit = event_queue_.rbegin(); + + while (rit != event_queue_.rend()) { + auto event = *rit; + + switch (event.type) { + case SUBSCRIPTION_EVENT: + { + bool limit_reached = subscription_events_reached_limit(event.entity); + + if (limit_reached) { + // Remove oldest events + rit = decltype(rit)(event_queue_.erase(std::next(rit).base())); + } else { + rit++; + } + break; + } + + case SERVICE_EVENT: + case CLIENT_EVENT: + { + break; + } + + case WAITABLE_EVENT: + { + bool limit_reached = waitable_events_reached_limit(event.entity); + + if (limit_reached) { + // Remove oldest events + rit = decltype(rit)(event_queue_.erase(std::next(rit).base())); + } else { + rit++; + } + break; + } + } + } +} diff --git a/rclcpp/src/rclcpp/waitable.cpp b/rclcpp/src/rclcpp/waitable.cpp index e30ff05f4e..1d7856a345 100644 --- a/rclcpp/src/rclcpp/waitable.cpp +++ b/rclcpp/src/rclcpp/waitable.cpp @@ -71,3 +71,10 @@ Waitable::set_events_executor_callback( throw std::runtime_error( "Custom waitables should override set_events_executor_callback() to use events executor"); } + +rmw_qos_profile_t +Waitable::get_actual_qos() const +{ + throw std::runtime_error( + "Custom waitables should override get_actual_qos() to use events executor"); +} diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index a8393974d7..eaa84a4a41 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -484,6 +484,12 @@ class TestWaitable : public rclcpp::Waitable } } + rmw_qos_profile_t get_actual_qos() const override + { + rmw_qos_profile_t qos; + return qos; + } + private: size_t count_ = 0; rcl_guard_condition_t gc_;